Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Oct 16, 2024
1 parent 12d98cf commit 108ec11
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 44 deletions.
67 changes: 39 additions & 28 deletions nexus/db-queries/src/db/datastore/clickhouse_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,30 +95,7 @@ impl DataStore {
let num_inserted = if policy.version == 1 {
self.clickhouse_policy_insert_first_policy(opctx, &policy).await?
} else {
let prev_version = policy.version - 1;

sql_query(
r"INSERT INTO clickhouse_policy
(version, clickhouse_mode, clickhouse_cluster_target_servers,
clickhouse_cluster_target_keepers, time_created)
SELECT $1, $2, $3, $4, $5
FROM clickhouse_policy WHERE version = $6 AND version IN
(SELECT version FROM clickhouse_policy
ORDER BY version DESC LIMIT 1)",
)
.bind::<sql_types::BigInt, SqlU32>(policy.version.into())
.bind::<ClickhouseModeEnum, DbClickhouseMode>((&policy.mode).into())
.bind::<sql_types::SmallInt, SqlU8>(
policy.mode.target_servers().into(),
)
.bind::<sql_types::SmallInt, SqlU8>(
policy.mode.target_keepers().into(),
)
.bind::<sql_types::Timestamptz, _>(policy.time_created)
.bind::<sql_types::BigInt, SqlU32>(prev_version.into())
.execute_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?
self.clickhouse_policy_insert_next_policy(opctx, &policy).await?
};

match num_inserted {
Expand All @@ -133,6 +110,40 @@ impl DataStore {
}
}

/// Insert the next version of the policy in the database
///
/// Only succeeds if the prior version is the latest version currently
/// in the `clickhouse_policy` table.
///
/// Panics if `policy.version <= 1`;
async fn clickhouse_policy_insert_next_policy(
&self,
opctx: &OpContext,
policy: &ClickhousePolicy,
) -> Result<usize, Error> {
assert!(policy.version > 1);
let prev_version = policy.version - 1;

sql_query(
r"INSERT INTO clickhouse_policy
(version, clickhouse_mode, clickhouse_cluster_target_servers,
clickhouse_cluster_target_keepers, time_created)
SELECT $1, $2, $3, $4, $5
FROM clickhouse_policy WHERE version = $6 AND version IN
(SELECT version FROM clickhouse_policy
ORDER BY version DESC LIMIT 1)",
)
.bind::<sql_types::BigInt, SqlU32>(policy.version.into())
.bind::<ClickhouseModeEnum, DbClickhouseMode>((&policy.mode).into())
.bind::<sql_types::SmallInt, SqlU8>(policy.mode.target_servers().into())
.bind::<sql_types::SmallInt, SqlU8>(policy.mode.target_keepers().into())
.bind::<sql_types::Timestamptz, _>(policy.time_created)
.bind::<sql_types::BigInt, SqlU32>(prev_version.into())
.execute_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Insert the first clickhouse policy in the database at version 1.
///
/// Only insert this policy if no other policy exists yet.
Expand All @@ -144,10 +155,10 @@ impl DataStore {
policy: &ClickhousePolicy,
) -> Result<usize, Error> {
sql_query(
r"INSERT INTO clickhouse_policy
(version, clickhouse_mode, clickhouse_cluster_target_servers,
clickhouse_cluster_target_keepers, time_created)
SELECT $1, $2, $3, $4, $5
r"INSERT INTO clickhouse_policy
(version, clickhouse_mode, clickhouse_cluster_target_servers,
clickhouse_cluster_target_keepers, time_created)
SELECT $1, $2, $3, $4, $5
WHERE NOT EXISTS (SELECT * FROM clickhouse_policy)",
)
.bind::<sql_types::BigInt, SqlU32>(policy.version.into())
Expand Down
29 changes: 13 additions & 16 deletions nexus/types/src/deployment/planning_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,28 @@ impl PlanningInput {
}

pub fn target_clickhouse_zone_count(&self) -> usize {
if let Some(policy) = &self.policy.clickhouse_policy {
match policy.mode {
ClickhouseMode::SingleNodeOnly => {
SINGLE_NODE_CLICKHOUSE_REDUNDANCY
}
ClickhouseMode::ClusterOnly { .. } => 0,
ClickhouseMode::Both { .. } => {
SINGLE_NODE_CLICKHOUSE_REDUNDANCY
}
}
} else {
SINGLE_NODE_CLICKHOUSE_REDUNDANCY
match self.policy.clickhouse_policy.as_ref().map(|policy| &policy.mode)
{
Some(&ClickhouseMode::ClusterOnly { .. }) => 0,
Some(&ClickhouseMode::SingleNodeOnly)
| Some(&ClickhouseMode::Both { .. })
| None => SINGLE_NODE_CLICKHOUSE_REDUNDANCY,
}
}

pub fn target_clickhouse_server_zone_count(&self) -> usize {
self.policy
.clickhouse_policy
.as_ref()
.map(|policy| policy.mode.target_servers() as usize)
.map(|policy| usize::from(policy.mode.target_servers()))
.unwrap_or(0)
}

pub fn target_clickhouse_keeper_zone_count(&self) -> usize {
self.policy
.clickhouse_policy
.as_ref()
.map(|policy| policy.mode.target_keepers() as usize)
.map(|policy| usize::from(policy.mode.target_keepers()))
.unwrap_or(0)
}

Expand Down Expand Up @@ -912,14 +906,17 @@ impl ClickhouseMode {
pub fn cluster_enabled(&self) -> bool {
match self {
ClickhouseMode::SingleNodeOnly => false,
_ => true,
ClickhouseMode::ClusterOnly { .. }
| ClickhouseMode::Both { .. } => true,
}
}

pub fn single_node_enabled(&self) -> bool {
match self {
ClickhouseMode::ClusterOnly { .. } => false,
_ => true,
ClickhouseMode::SingleNodeOnly | ClickhouseMode::Both { .. } => {
true
}
}
}

Expand Down

0 comments on commit 108ec11

Please sign in to comment.