Skip to content

Commit

Permalink
feat: cache logical region's metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Oct 12, 2024
1 parent aaa9b32 commit 1b7ab29
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 18 deletions.
26 changes: 18 additions & 8 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,19 @@ impl MetricEngineInner {
/// Return the physical region id behind this logical region
async fn alter_logical_region(
&self,
region_id: RegionId,
logical_region_id: RegionId,
request: RegionAlterRequest,
) -> Result<RegionId> {
let physical_region_id = {
let state = &self.state.read().unwrap();
state.get_physical_region_id(region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
})?
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?
};

// only handle adding column
Expand All @@ -87,7 +91,7 @@ impl MetricEngineInner {
.metadata_region
.column_semantic_type(
metadata_region_id,
region_id,
logical_region_id,
&col.column_metadata.column_schema.name,
)
.await?
Expand All @@ -102,18 +106,24 @@ impl MetricEngineInner {
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
region_id,
logical_region_id,
columns_to_add,
)
.await?;

// register columns to logical region
for col in columns {
self.metadata_region
.add_column(metadata_region_id, region_id, &col.column_metadata)
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.await?;
}

// invalid logical column cache
self.state
.write()
.unwrap()
.invalid_logical_column_cache(logical_region_id);

Ok(physical_region_id)
}

Expand Down
10 changes: 4 additions & 6 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ impl MetricEngineInner {
) -> Result<Vec<usize>> {
// project on logical columns
let all_logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.load_logical_column_names(physical_region_id, logical_region_id)
.await?;
let projected_logical_names = origin_projection
.iter()
.map(|i| all_logical_columns[*i].column_schema.name.clone())
.map(|i| all_logical_columns[*i].clone())
.collect::<Vec<_>>();

// generate physical projection
Expand All @@ -200,10 +200,8 @@ impl MetricEngineInner {
logical_region_id: RegionId,
) -> Result<Vec<usize>> {
let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|col| col.column_schema.name);
.load_logical_column_names(physical_region_id, logical_region_id)
.await?;
let mut projection = Vec::with_capacity(logical_columns.len());
let data_region_id = utils::to_data_region_id(physical_region_id);
let physical_metadata = self
Expand Down
57 changes: 53 additions & 4 deletions src/metric-engine/src/engine/region_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,74 @@ use crate::error::Result;
impl MetricEngineInner {
/// Load column metadata of a logical region.
///
/// The return value is ordered on [ColumnId].
/// The return value is ordered on column name.
pub async fn load_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// load logical and physical columns, and intersect them to get logical column metadata
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns.clone());
}

// Else load from metadata region and update the cache.
// Load logical and physical columns, and intersect them to get logical column metadata.
let mut logical_column_metadata = self
.metadata_region
.logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>();

// sort columns on column id to ensure the order
// Sort columns on column name to ensure the order
logical_column_metadata
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
// Update cache
self.state
.write()
.unwrap()
.add_logical_columns(logical_region_id, logical_column_metadata.clone());

Ok(logical_column_metadata)
}

/// Load logical column names of a logical region.
///
/// The return value is ordered on column name alphabetically.
pub async fn load_logical_column_names(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<String>> {
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns
.iter()
.map(|c| c.column_schema.name.clone())
.collect());
}

// Else load from metadata region
let columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|c| c.column_schema.name)
.collect::<Vec<_>>();

Ok(columns)
}
}
30 changes: 30 additions & 0 deletions src/metric-engine/src/engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::collections::{HashMap, HashSet};

use snafu::OptionExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;

use crate::error::{PhysicalRegionNotFoundSnafu, Result};
Expand All @@ -35,6 +36,10 @@ pub(crate) struct MetricEngineState {
/// Cache for the columns of physical regions.
/// The region id in key is the data region id.
physical_columns: HashMap<RegionId, HashSet<String>>,
/// Cache for the column metadata of logical regions.
/// The column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
}

impl MetricEngineState {
Expand Down Expand Up @@ -80,6 +85,21 @@ impl MetricEngineState {
.insert(logical_region_id, physical_region_id);
}

/// Add and reorder logical columns.
///
/// Caller should make sure:
/// 1. there is no duplicate columns
/// 2. the column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
pub fn add_logical_columns(
&mut self,
logical_region_id: RegionId,
new_columns: impl IntoIterator<Item = ColumnMetadata>,
) {
let columns = self.logical_columns.entry(logical_region_id).or_default();
columns.extend(new_columns);
}

pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
self.logical_regions.get(&logical_region_id).copied()
}
Expand All @@ -88,6 +108,10 @@ impl MetricEngineState {
&self.physical_columns
}

pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
&self.logical_columns
}

pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
&self.physical_regions
}
Expand Down Expand Up @@ -129,9 +153,15 @@ impl MetricEngineState {
.unwrap() // Safety: physical_region_id is got from physical_regions
.remove(&logical_region_id);

self.logical_columns.remove(&logical_region_id);

Ok(())
}

pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
self.logical_columns.remove(&logical_region_id);
}

pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id)
}
Expand Down

0 comments on commit 1b7ab29

Please sign in to comment.