Skip to content

Commit

Permalink
fix: prevent registering logical regions with AliveKeeper (#3965)
Browse files Browse the repository at this point in the history
* fix: register logical region

* chore: fix Clippy

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored May 17, 2024
1 parent 0168d43 commit f696f41
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 75 deletions.
28 changes: 23 additions & 5 deletions src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use snafu::{location, Location, Snafu};
use tonic::{Code, Status};

#[derive(Snafu)]
Expand Down Expand Up @@ -83,14 +83,28 @@ pub enum Error {
},

#[snafu(display("Failed to request RegionServer, code: {}", code))]
RegionServer { code: Code, source: BoxedError },
RegionServer {
code: Code,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},

// Server error carried in Tonic Status's metadata.
#[snafu(display("{}", msg))]
Server { code: StatusCode, msg: String },
Server {
code: StatusCode,
msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Illegal Database response: {err_msg}"))]
IllegalDatabaseResponse { err_msg: String },
IllegalDatabaseResponse {
err_msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to send request with streaming: {}", err_msg))]
ClientStreaming {
Expand Down Expand Up @@ -148,7 +162,11 @@ impl From<Status> for Error {
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());

Self::Server { code, msg }
Self::Server {
code,
msg,
location: location!(),
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl RegionRequester {
error::Error::RegionServer {
code,
source: BoxedError::new(err),
location: location!(),
}
})?
.into_inner();
Expand Down Expand Up @@ -272,7 +273,7 @@ mod test {
err_msg: "blabla".to_string(),
}),
}));
let Server { code, msg } = result.unwrap_err() else {
let Server { code, msg, .. } = result.unwrap_err() else {
unreachable!()
};
assert_eq!(code, StatusCode::Internal);
Expand Down
6 changes: 4 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ mod tests {
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;

Expand All @@ -528,7 +529,7 @@ mod tests {
let txn = mgr
.build_create_txn(
1028,
"mock",
MITO_ENGINE_NAME,
"foo/bar/weny",
HashMap::from([("foo".to_string(), "bar".to_string())]),
HashMap::default(),
Expand All @@ -542,8 +543,9 @@ mod tests {

#[tokio::test]
async fn test_initialize_region_server() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (mock_region, mut mock_region_handler) = MockRegionEngine::new();
let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);

mock_region_server.register_engine(mock_region.clone());

Expand Down
73 changes: 39 additions & 34 deletions src/datanode/src/heartbeat/handler/upgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ mod tests {
use std::time::Duration;

use common_meta::instruction::{InstructionReply, UpgradeRegion};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::time::Instant;
Expand All @@ -133,7 +134,7 @@ mod tests {
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);

let handler_context = HandlerContext {
Expand Down Expand Up @@ -167,13 +168,14 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);

let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
// Should be unreachable.
unreachable!();
}));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
// Should be unreachable.
unreachable!();
}));
});
mock_region_server.register_test_region(region_id, mock_engine);

let handler_context = HandlerContext {
Expand Down Expand Up @@ -207,13 +209,14 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);

let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
mock_region_server.register_test_region(region_id, mock_engine);

let handler_context = HandlerContext {
Expand Down Expand Up @@ -247,13 +250,14 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);

let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
mock_region_server.register_test_region(region_id, mock_engine);

let waits = vec![
Expand Down Expand Up @@ -308,18 +312,19 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);

let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock_error".to_string(),
}
.fail()
}));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock_error".to_string(),
}
.fail()
}));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
mock_region_server.register_test_region(region_id, mock_engine);

let handler_context = HandlerContext {
Expand Down
Loading

0 comments on commit f696f41

Please sign in to comment.