Skip to content

Commit

Permalink
Merge branch 'main' into yuchen/remove-adjacent-vectored-read-builder
Browse files Browse the repository at this point in the history
  • Loading branch information
yliang412 authored Sep 27, 2024
2 parents 5193ff8 + 42ef08d commit 677f9a4
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::{Condvar, Mutex, RwLock};
use std::thread;
use std::time::Duration;
use std::time::Instant;

use anyhow::{Context, Result};
Expand Down Expand Up @@ -710,7 +711,7 @@ impl ComputeNode {
info!("running initdb");
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
Command::new(initdb_bin)
.args(["-D", pgdata])
.args(["--pgdata", pgdata])
.output()
.expect("cannot start initdb process");

Expand Down Expand Up @@ -1398,6 +1399,36 @@ LIMIT 100",
}
Ok(remote_ext_metrics)
}

/// Waits until current thread receives a state changed notification and
/// the pageserver connection strings has changed.
///
/// The operation will time out after a specified duration.
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
let state = self.state.lock().unwrap();
let old_pageserver_connstr = state
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr
.clone();
let mut unchanged = true;
let _ = self
.state_changed
.wait_timeout_while(state, duration, |s| {
let pageserver_connstr = &s
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr;
unchanged = pageserver_connstr == &old_pageserver_connstr;
unchanged
})
.unwrap();
if !unchanged {
info!("Pageserver config changed");
}
}
}

pub fn forward_termination_signal() {
Expand Down
13 changes: 6 additions & 7 deletions compute_tools/src/lsn_lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ fn lsn_lease_bg_task(
.max(valid_duration / 2);

info!(
"Succeeded, sleeping for {} seconds",
"Request succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
thread::sleep(sleep_duration);
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
}
}

Expand Down Expand Up @@ -89,10 +89,7 @@ fn acquire_lsn_lease_with_retry(
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token.clone());
} else {
info!("Storage auth token not set");
}
config
})
Expand All @@ -108,9 +105,11 @@ fn acquire_lsn_lease_with_retry(
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
}
Err(e) => {
warn!("Failed to acquire lsn lease: {e} (attempt {attempts}");
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");

thread::sleep(Duration::from_millis(retry_period_ms as u64));
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
retry_period_ms as u64,
));
retry_period_ms *= 1.5;
retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
}
Expand Down
1 change: 1 addition & 0 deletions control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ anyhow.workspace = true
camino.workspace = true
clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
humantime.workspace = true
nix.workspace = true
once_cell.workspace = true
Expand Down
26 changes: 18 additions & 8 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,17 +894,27 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
// to pass these on to postgres.
let storage_controller = StorageController::from_env(env);
let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
let pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
let pageservers = futures::future::try_join_all(
locate_result.shards.into_iter().map(|shard| async move {
if let ComputeMode::Static(lsn) = endpoint.mode {
// Initialize LSN leases for static computes.
let conf = env.get_pageserver_conf(shard.node_id).unwrap();
let pageserver = PageServerNode::from_env(env, conf);

pageserver
.http_client
.timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
.await?;
}

anyhow::Ok((
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>();
))
}),
)
.await?;
let stripe_size = locate_result.shard_params.stripe_size;

(pageservers, stripe_size)
Expand Down
2 changes: 1 addition & 1 deletion control_plane/src/storage_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl StorageController {

if !tokio::fs::try_exists(&pg_data_path).await? {
let initdb_args = [
"-D",
"--pgdata",
pg_data_path.as_ref(),
"--username",
&username(),
Expand Down
4 changes: 2 additions & 2 deletions libs/postgres_ffi/wal_craft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ impl Conf {
);
let output = self
.new_pg_command("initdb")?
.arg("-D")
.arg("--pgdata")
.arg(&self.datadir)
.args(["-U", "postgres", "--no-instructions", "--no-sync"])
.args(["--username", "postgres", "--no-instructions", "--no-sync"])
.output()?;
debug!("initdb output: {:?}", output);
ensure!(
Expand Down
18 changes: 18 additions & 0 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,4 +736,22 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}

pub async fn timeline_init_lsn_lease(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<LsnLease> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
self.mgmt_api_endpoint,
);

self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}
17 changes: 13 additions & 4 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ async fn get_lsn_by_timestamp_handler(

let lease = if with_lease {
timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
.init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
.inspect_err(|_| {
warn!("fail to grant a lease to {}", lsn);
})
Expand Down Expand Up @@ -1692,9 +1692,18 @@ async fn lsn_lease_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;

let result = async {
timeline
.init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.map_err(|e| {
ApiError::InternalServerError(
e.context(format!("invalid lsn lease request at {lsn}")),
)
})
}
.instrument(info_span!("init_lsn_lease", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await?;

json_response(StatusCode::OK, result)
}
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl PageServerHandler {
set_tracing_field_shard_id(&timeline);

let lease = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
.renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
.inspect_err(|e| {
warn!("{e}");
})
Expand Down
Loading

0 comments on commit 677f9a4

Please sign in to comment.