-
Notifications
You must be signed in to change notification settings - Fork 752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cluster): support system-managed cluster #17051
base: main
Are you sure you want to change the base?
Conversation
PR Summary
|
302db4a
to
c3af3db
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming seems a bit inconsistent with the keys used in the meta-service and the corresponding variables. Aligning them might improve clarity.
Reviewed 1 of 9 files at r1, 10 of 61 files at r2.
Reviewable status: 11 of 62 files reviewed, 8 unresolved discussions (waiting on @zhang2014)
src/query/management/src/warehouse/warehouse_api.rs
line 22 at r2 (raw file):
#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Debug)] pub enum SelectedNode { Random(Option<String>),
What does the string in it mean? If the code isn't self-explanatory, it deserves a documentation comment.
src/query/management/src/warehouse/warehouse_api.rs
line 29 at r2 (raw file):
#[derive(serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] pub enum WarehouseInfo { SelfManaged(String),
Need doc to explain the meaning of this string.
src/query/management/src/warehouse/warehouse_mgr.rs
line 87 at r2 (raw file):
escape_for_key(tenant)?, ), })
I'm kind of confused of the field names. A bit inconsistent with the name and the comment. GPT gives a refined version of this part. make sense?
It would be clearer to provide an explanation of the key format used under these prefixes.
Suggestion:
Ok(WarehouseMgr {
metastore,
lift_time,
// Prefix for all online nodes of the tenant
online_nodes_key_prefix: format!(
"{}/{}/online_nodes",
WAREHOUSE_API_KEY_PREFIX,
escape_for_key(tenant)?
),
// Prefix for all online computing clusters of the tenant
online_clusters_key_prefix: format!(
"{}/{}/online_clusters",
WAREHOUSE_API_KEY_PREFIX,
escape_for_key(tenant)?
),
// Prefix for all warehouses of the tenant (must ensure compatibility across all versions)
warehouses_key_prefix: format!(
"{}/v1/{}",
WAREHOUSE_META_KEY_PREFIX,
escape_for_key(tenant)?,
),
})
src/query/management/src/warehouse/warehouse_mgr.rs
line 138 at r2 (raw file):
let mut txn = TxnRequest::default(); let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(&node.id)?);
This should be replaced with self.node_key(&node)
.
src/query/management/src/warehouse/warehouse_mgr.rs
line 148 at r2 (raw file):
)); let warehouse_node_key = self.cluster_key(&node)?;
The variable name wharehouse_node_key
is kind of inconsistent with the right side method cluster_key()
src/query/management/src/warehouse/warehouse_mgr.rs
line 154 at r2 (raw file):
self.warehouse_key_prefix, escape_for_key(&node.warehouse_id)? );
introduce another key-building method for this?
Code quote:
let warehouse_info_key = format!(
"{}/{}",
self.warehouse_key_prefix,
escape_for_key(&node.warehouse_id)?
);
src/query/management/src/warehouse/warehouse_mgr.rs
line 157 at r2 (raw file):
node.cluster_id = String::new(); node.warehouse_id = String::new();
This feels a bit odd: at the beginning of this method, it asserts that these two fields are non-empty, but later they are cleared.
Would it make sense to pass cluster_id
and warehouse_id
as separate arguments to this method instead?
Code quote:
node.cluster_id = String::new();
node.warehouse_id = String::new();
src/meta/types/src/cluster.rs
line 100 at r2 (raw file):
pub warehouse_id: String, pub runtime_resource_group: Option<String>,
This field does not need to specify #[serde(skip_serializing_if = "Option::is_none")]
?
Encoding a null
value may break older version query?
And for the three new fields, is #[serde(skip)]
needed to allow it to decode an older version NodeInfo json that does not have these fields?
# Conflicts: # src/query/config/src/config.rs
Previously, drmingdrmer (张炎泼) wrote…
We do not need to consider compatibility issues with older versions. The prefix has been modified, and now the new version will be written under the new prefix. |
Previously, drmingdrmer (张炎泼) wrote…
Done. |
Previously, drmingdrmer (张炎泼) wrote…
Done. |
Previously, drmingdrmer (张炎泼) wrote…
Done. |
Previously, drmingdrmer (张炎泼) wrote…
Done. |
Previously, drmingdrmer (张炎泼) wrote…
Done. |
Previously, drmingdrmer (张炎泼) wrote…
Done. |
Previously, drmingdrmer (张炎泼) wrote…
Done. Extracted the // Unload the warehouse and cluster from the node. |
system-managed warehouse? |
@drmingdrmer Thanks very helpful comments. I have resolved the issues. Please review again. |
…e into feat/managment_cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 59 files at r4, 2 of 4 files at r5.
Reviewable status: 9 of 77 files reviewed, 10 unresolved discussions (waiting on @zhang2014)
src/query/management/src/warehouse/warehouse_mgr.rs
line 157 at r2 (raw file):
Previously, zhang2014 (Winter Zhang) wrote…
Done. Extracted the
unload_warehouse_info
function and added comments.// Unload the warehouse and cluster from the node.
// 1. Used when a node is removed from the cluster.
// 2. For cluster_node_key: node_info, since its warehouse and cluster are already encoded in the key, we do not need to write the warehouse and cluster into its value again.
Have you considered removing cluster_id
and warehouse_id
from NodeInfo
, since they are not actually stored but just used to generate some keys.
src/query/management/src/warehouse/warehouse_api.rs
line 29 at r2 (raw file):
Previously, zhang2014 (Winter Zhang) wrote…
Done.
In such case, making it SelfManaged{ cluster_id: String }
would be more self-explanatory.
src/query/management/src/warehouse/warehouse_mgr.rs
line 75 at r5 (raw file):
// Prefix for all online computing clusters of the tenant meta_key_prefix: format!( "{}/{}/online_clusters_v2",
The meaning of the meta-key
is still unclear. what does meta
mean here?
src/query/management/src/warehouse/warehouse_mgr.rs
line 179 at r5 (raw file):
serde_json::to_vec(&warehouse_info)?, Some(self.lift_time), ));
The comment says that warehouse info is written conditionally. But the code always write the warehouse info unconditionally.
The comment should be // self-managed warehouse requires to persist warehouse info
or something else?
Code quote:
// upsert warehouse info if self-managed.
txn.if_then.push(TxnOp::put_with_ttl(
warehouse_info_key.clone(),
serde_json::to_vec(&warehouse_info)?,
Some(self.lift_time),
));
src/query/management/src/warehouse/warehouse_mgr.rs
line 186 at r5 (raw file):
if seq != MatchSeq::Exact(0) { return Ok(self.metastore.transaction(txn).await?); }
It's a little bit weird: why not retry if seq != MatchSeq::Exact(0)
?
This function's retry behavior is inconsistent. When seq
equals MatchSeq::Exact(0)
, it retries until the update succeeds. However, for any other seq
value, it makes no retry attempts. If retry behavior should be optional, it should be controlled by an explicit parameter rather than being tied to the seq
value.
Code quote:
if seq != MatchSeq::Exact(0) {
return Ok(self.metastore.transaction(txn).await?);
}
src/query/management/src/warehouse/warehouse_mgr.rs
line 200 at r5 (raw file):
warehouse_txn .else_then .push(TxnOp::get(warehouse_info_key.clone()));
It looks like this else
operation of getting warehouse can be added before entering the loop?
Code quote:
warehouse_txn
.else_then
.push(TxnOp::get(warehouse_info_key.clone()));
src/query/management/src/warehouse/warehouse_mgr.rs
line 236 at r5 (raw file):
} response => Ok(response), };
This piece of logic nested deeply and quite hard to understand. I asked GPT to rewrite it to reduce the indent.
Does it reflect the logic correctly?
let mut response = self.metastore.transaction(warehouse_txn).await?;
if response.success {
return Ok(response);
}
let resp = match response.responses.pop().and_then(|x| x.response) {
Some(Response::Get(data)) => data,
_ => return Ok(response),
};
let value = match resp.value {
None => return Ok(response),
Some(value) if value.seq == 0 => return Ok(response),
Some(value) => value,
};
let warehouse_info = serde_json::from_slice(&value.data)?;
match warehouse_info {
WarehouseInfo::SystemManaged(_) => {
return Err(ErrorCode::WarehouseAlreadyExists(
"Already exists same name system-managed warehouse.",
));
}
WarehouseInfo::SelfManaged(_) => {
// Check if node already exists
if let Some(TxnOpResponse {
response: Some(Response::Get(TxnGetResponse { value: Some(value), .. })),
}) = response.responses.first()
{
if value.seq != 0 {
return Ok(response);
}
}
log::info!(
"Self-managed warehouse has already been created by other nodes; attempt to join it. Retry count: {}",
retry_count
);
retry_count += 1;
exact_seq = value.seq;
continue;
}
}
Code quote:
return match self.metastore.transaction(warehouse_txn).await? {
mut response if !response.success => {
return match response.responses.pop().and_then(|x| x.response) {
Some(Response::Get(data)) => match data.value {
None => Ok(response),
Some(value) if value.seq == 0 => Ok(response),
Some(value) => match serde_json::from_slice(&value.data)? {
WarehouseInfo::SystemManaged(_) => {
Err(ErrorCode::WarehouseAlreadyExists(
"Already exists same name system-managed warehouse.",
))
}
WarehouseInfo::SelfManaged(_) => match response.responses.first() {
// already exists node.
Some(TxnOpResponse {
response:
Some(Response::Get(TxnGetResponse {
value: Some(value),
..
})),
}) if value.seq != 0 => Ok(response),
_ => {
log::info!("Self-managed warehouse has already been created by other nodes; attempt to join it. Retry count: {}", retry_count);
retry_count += 1;
exact_seq = value.seq;
continue;
}
},
},
},
_ => Ok(response),
};
}
response => Ok(response),
};
src/query/management/src/warehouse/warehouse_mgr.rs
line 321 at r5 (raw file):
// std::mem::swap(&mut node_info.warehouse_id, &mut warehouse_id); Err(err) }
Suggestion:
match upsert_node.await? {
src/query/management/src/warehouse/warehouse_mgr.rs
line 327 at r5 (raw file):
// std::mem::swap(&mut node_info.warehouse_id, &mut warehouse_id); Ok(seq) }
No need to rollback right?
Code quote:
match upsert_node.await {
Err(err) => {
// rollback
// std::mem::swap(&mut node_info.cluster_id, &mut cluster_id);
// std::mem::swap(&mut node_info.warehouse_id, &mut warehouse_id);
Err(err)
}
Ok(response) if !response.success => {
// rollback
// std::mem::swap(&mut node_info.cluster_id, &mut cluster_id);
// std::mem::swap(&mut node_info.warehouse_id, &mut warehouse_id);
Ok(seq)
}
src/query/management/src/warehouse/warehouse_mgr.rs
line 444 at r5 (raw file):
continue 'retry; } }
Too many indents...hard to understand...
A rewritten version of this block with less indent looks like:
let response = self.metastore.transaction(after_txn).await?;
if !response.success {
continue 'retry;
}
let mut consistent_nodes = Vec::with_capacity(response.responses.len());
for (idx, response) in response.responses.into_iter().enumerate() {
let get_response = match response.response {
Some(Response::Get(response)) => response,
_ => continue 'retry,
};
let value = match get_response.value {
Some(value) => value,
_ => continue 'retry,
};
let node_info = serde_json::from_slice::<NodeInfo>(&value.data)?;
assert_eq!(node_info.warehouse_id, id);
assert!(!node_info.cluster_id.is_empty());
consistent_nodes.push(ConsistentNodeInfo {
node_seq: value.seq,
cluster_seq: cluster_node_seq[idx],
node_info,
});
}
if consistent_nodes.len() != cluster_node_seq.len() {
continue 'retry;
}
return Ok(ConsistentWarehouseInfo {
info_seq: before_info.seq,
warehouse_info: serde_json::from_slice(&before_info.data)?,
consistent_nodes,
});
Code quote:
match self.metastore.transaction(after_txn).await? {
response if response.success => {
let mut consistent_nodes = Vec::with_capacity(response.responses.len());
for (idx, response) in response.responses.into_iter().enumerate() {
match response.response {
// TODO: maybe ignore none(not need retry)
Some(Response::Get(response)) => match response.value {
Some(value) => {
let node_info =
serde_json::from_slice::<NodeInfo>(&value.data)?;
assert_eq!(node_info.warehouse_id, id);
assert!(!node_info.cluster_id.is_empty());
consistent_nodes.push(ConsistentNodeInfo {
node_seq: value.seq,
cluster_seq: cluster_node_seq[idx],
node_info,
});
}
_ => {
continue 'retry;
}
},
_ => {
continue 'retry;
}
}
}
if consistent_nodes.len() == cluster_node_seq.len() {
return Ok(ConsistentWarehouseInfo {
info_seq: before_info.seq,
warehouse_info: serde_json::from_slice(&before_info.data)?,
consistent_nodes,
});
}
}
_ => {
continue 'retry;
}
}
do we have a config to disable these WAREHOUSE statements? we have a cloud deployment with the warehouse concept, but before we made the integration with the WAREHOUSE statements with the underlying kubernetes primitives, it might not bad to allow us disable the WAREHOUSE statements to avoid concept conflicts on possible corner cases. |
By default(without the resources_management configuration), we return an unimplemented error. Is this enough? |
SGTM 👍 |
### Improvements: - Added retry mechanism with a fallback in the retry loop. - Return an error if an unexpected response is received when `TxnGetResponse` is expected. - Refined quit-retry condition: now only triggered when the seq of `NodeInfo` changes. ### Refactoring: - Simplified and decoupled nested branching for better readability and maintainability. - Consolidated related logic, e.g., building `txn if_then` operations in a single place. - Differentiated `NodeInfo` with and without warehouse-related information. ### Documentation: - Added details explaining behavioral differences between insert and update modes.
refactor: improve and clean up `warehouse_mgr::upsert_self_managed()`
I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/
Summary
feat(cluster): support system-managed cluster
Add new sql statement
Tests
Type of change
This change is