Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix open region missing path #2441

Merged
merged 4 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,34 @@ impl Display for SimpleReply {
}
}

impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"OpenRegion(region_ident={}, region_storage_path={})",
self.region_ident, self.region_storage_path
)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
}

impl OpenRegion {
pub fn new(region_ident: RegionIdent, path: &str) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
pub enum Instruction {
OpenRegion(RegionIdent),
OpenRegion(OpenRegion),
CloseRegion(RegionIdent),
InvalidateTableIdCache(TableId),
InvalidateTableNameCache(TableName),
Expand Down Expand Up @@ -93,18 +118,21 @@ mod tests {

#[test]
fn test_serialize_instruction() {
let open_region = Instruction::OpenRegion(RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
});
let open_region = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
},
"test/foo",
));

let serialized = serde_json::to_string(&open_region).unwrap();

assert_eq!(
r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo"}}"#,
serialized
);

Expand Down
10 changes: 7 additions & 3 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use common_query::Output;
use common_telemetry::error;
use snafu::OptionExt;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;

Expand All @@ -43,11 +44,14 @@ impl RegionHeartbeatResponseHandler {

fn instruction_to_request(instruction: Instruction) -> MetaResult<(RegionId, RegionRequest)> {
match instruction {
Instruction::OpenRegion(region_ident) => {
Instruction::OpenRegion(OpenRegion {
region_ident,
region_storage_path,
}) => {
let region_id = Self::region_ident_to_region_id(&region_ident);
let open_region_req = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: "".to_string(),
region_dir: region_dir(&region_storage_path, region_id),
options: HashMap::new(),
});
Ok((region_id, open_region_req))
Expand Down
19 changes: 11 additions & 8 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent};
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, RegionIdent};
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
Expand Down Expand Up @@ -90,13 +90,16 @@ fn close_region_instruction() -> Instruction {
}

fn open_region_instruction() -> Instruction {
Instruction::OpenRegion(RegionIdent {
table_id: 1024,
region_number: 0,
cluster_id: 1,
datanode_id: 2,
engine: "mito2".to_string(),
})
Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
table_id: 1024,
region_number: 0,
cluster_id: 1,
datanode_id: 2,
engine: "mito2".to_string(),
},
"path/dir",
))
}

pub struct MockQueryEngine;
Expand Down
14 changes: 10 additions & 4 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ mod tests {

use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
Expand Down Expand Up @@ -426,6 +428,7 @@ mod tests {
pub context: RegionFailoverContext,
pub heartbeat_receivers: HashMap<DatanodeId, Receiver<tonic::Result<HeartbeatResponse>>>,
pub pushers: Pushers,
pub path: String,
}

impl TestingEnv {
Expand Down Expand Up @@ -549,6 +552,7 @@ mod tests {
},
pushers,
heartbeat_receivers,
path: region_storage_path(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME).to_string(),
}
}
}
Expand Down Expand Up @@ -607,16 +611,18 @@ mod tests {
for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() {
let mailbox_clone = env.context.mailbox.clone();
let failed_region_clone = failed_region.clone();
let path = env.path.to_string();
let candidate_tx = candidate_tx.clone();
let _handle = common_runtime::spawn_bg(async move {
let resp = recv.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(
failed_region_clone.clone()
))
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
failed_region_clone,
&path
)))
.unwrap(),
))
);
Expand Down
36 changes: 30 additions & 6 deletions src/meta-srv/src/procedure/region_failover/activate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ use std::time::Duration;

use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use super::update_metadata::UpdateRegionMetadata;
use super::{RegionFailoverContext, State};
use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_region_manager::InactiveRegionManager;
Expand All @@ -49,7 +50,22 @@ impl ActivateRegion {
failed_region: &RegionIdent,
timeout: Duration,
) -> Result<MailboxReceiver> {
let instruction = Instruction::OpenRegion(failed_region.clone());
let table_id = failed_region.table_id;
// TODO(weny): considers fetching table info only once.
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu { table_id })?
.table_info;

let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

let instruction =
Instruction::OpenRegion(OpenRegion::new(failed_region.clone(), &region_storage_path));

let msg = MailboxMessage::json_message(
"Activate Region",
Expand Down Expand Up @@ -179,7 +195,11 @@ mod tests {
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(failed_region.clone())).unwrap(),
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
failed_region.clone(),
&env.path
)))
.unwrap(),
))
);

Expand Down Expand Up @@ -241,7 +261,11 @@ mod tests {
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(failed_region.clone())).unwrap()
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
failed_region.clone(),
&env.path
)))
.unwrap(),
))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod tests {
mut heartbeat_receivers,
context,
pushers,
..
} = env;

for frontend_id in 4..=7 {
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableInfoNotFoundSnafu { table_id })?
.table_info;
let region_storage_patch =
let region_storage_path =
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
region_storage_path(&table_info.catalog_name, &table_info.schema_name);

let mut new_region_routes = table_route_value.region_routes.clone();
Expand All @@ -101,7 +101,7 @@ impl UpdateRegionMetadata {
.update_table_route(
table_id,
engine,
&region_storage_patch,
&region_storage_path,
table_route_value,
new_region_routes,
)
Expand Down
Loading