Skip to content

Commit

Permalink
neon_local: add a flock to protect against concurrent execution (#1…
Browse files Browse the repository at this point in the history
…0185)

## Problem

`neon_local` has always been unsafe to run concurrently with itself: it
uses simple text files for persistent state, and concurrent runs will
step on each other.

In some test environments we intentionally handle this with mutexes in
python land, but it's fragile to try and always remember to do that.

## Summary of changes

- Add a `flock` based mutex around the `main` function of neon_local,
using the repo directory as the file to lock
- Clean up an Option<> around control_plane_api, this is a drive-by
change because it was one of the fields that had a weird effect when
previous concurrent stuff stamped on it.
  • Loading branch information
jcsp authored Dec 18, 2024
1 parent d63602c commit 835287b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 43 deletions.
59 changes: 38 additions & 21 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
Expand All @@ -36,6 +37,8 @@ use safekeeper_api::{
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
Expand Down Expand Up @@ -689,6 +692,21 @@ struct TimelineTreeEl {
pub children: BTreeSet<TimelineId>,
}

/// A flock-based guard over the neon_local repository directory
struct RepoLock {
_file: File,
}

impl RepoLock {
fn new() -> Result<Self> {
let repo_dir = File::open(local_env::base_path())?;
let repo_dir_fd = repo_dir.as_raw_fd();
flock(repo_dir_fd, FlockArg::LockExclusive)?;

Ok(Self { _file: repo_dir })
}
}

// Main entry point for the 'neon_local' CLI utility
//
// This utility helps to manage neon installation. That includes following:
Expand All @@ -700,9 +718,14 @@ fn main() -> Result<()> {
let cli = Cli::parse();

// Check for 'neon init' command first.
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
handle_init(&args).map(|env| Some(Cow::Owned(env)))
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
} else {
// This tool uses a collection of simple files to store its state, and consequently
// it is not generally safe to run multiple commands concurrently. Rather than expect
// all callers to know this, use a lock file to protect against concurrent execution.
let _repo_lock = RepoLock::new().unwrap();

// all other commands need an existing config
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let original_env = env.clone();
Expand All @@ -728,11 +751,12 @@ fn main() -> Result<()> {
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};

if &original_env != env {
let subcommand_result = if &original_env != env {
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
} else {
subcommand_result.map(|()| None)
}
};
(subcommand_result, Some(_repo_lock))
};

match subcommand_result {
Expand Down Expand Up @@ -922,7 +946,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
} else {
// User (likely interactive) did not provide a description of the environment, give them the default
NeonLocalInitConf {
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
},
Expand Down Expand Up @@ -1718,18 +1742,15 @@ async fn handle_start_all_impl(
broker::start_broker_process(env, &retry_timeout).await
});

// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
}
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});

for ps_conf in &env.pageservers {
js.spawn(async move {
Expand Down Expand Up @@ -1774,10 +1795,6 @@ async fn neon_start_status_check(
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);

if env.control_plane_api.is_none() {
return Ok(());
}

let storcon = StorageController::from_env(env);

let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
Expand Down
10 changes: 5 additions & 5 deletions control_plane/src/local_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub struct LocalEnv {

// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Option<Url>,
pub control_plane_api: Url,

// Control plane upcall API for storage controller. If set, this will be propagated into the
// storage controller's configuration.
Expand Down Expand Up @@ -133,7 +133,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Option<Url>>,
pub control_plane_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Option<Url>>,
}

Expand Down Expand Up @@ -535,7 +535,7 @@ impl LocalEnv {
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api,
branch_name_mappings,
}
Expand Down Expand Up @@ -638,7 +638,7 @@ impl LocalEnv {
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
safekeepers: self.safekeepers.clone(),
control_plane_api: self.control_plane_api.clone(),
control_plane_api: Some(self.control_plane_api.clone()),
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
branch_name_mappings: self.branch_name_mappings.clone(),
},
Expand Down Expand Up @@ -768,7 +768,7 @@ impl LocalEnv {
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
safekeepers,
control_plane_api: control_plane_api.unwrap_or_default(),
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
branch_name_mappings: Default::default(),
};
Expand Down
28 changes: 13 additions & 15 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,19 @@ impl PageServerNode {

let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];

if let Some(control_plane_api) = &self.env.control_plane_api {
overrides.push(format!(
"control_plane_api='{}'",
control_plane_api.as_str()
));

// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}
overrides.push(format!(
"control_plane_api='{}'",
self.env.control_plane_api.as_str()
));

// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}

if !conf.other.contains_key("remote_storage") {
Expand Down
4 changes: 2 additions & 2 deletions control_plane/src/storage_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl StorageController {
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen_url = self.env.control_plane_api.clone();

let listen = format!(
"{}:{}",
Expand Down Expand Up @@ -708,7 +708,7 @@ impl StorageController {
} else {
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen_url = self.env.control_plane_api.clone();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),
Expand Down

1 comment on commit 835287b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7245 tests run: 6936 passed, 1 failed, 308 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (2)

Postgres 14

Code coverage* (full report)

  • functions: 31.3% (8395 of 26831 functions)
  • lines: 48.0% (66631 of 138896 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
835287b at 2024-12-18T18:54:31.343Z :recycle:

Please sign in to comment.