Skip to content

Commit

Permalink
feat(query): create drop inverted index (databendlabs#14859)
Browse files Browse the repository at this point in the history
* feat(query): create drop inverted index

* fix typos

* fix tests

* fix tests

* rewrite as table index

* index use column ids

* check column id exist
  • Loading branch information
b41sh authored Mar 9, 2024
1 parent eeaf97c commit a9d1fac
Show file tree
Hide file tree
Showing 60 changed files with 1,332 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum Feature {
BackgroundService,
DataMask,
AggregateIndex,
InvertedIndex,
ComputedColumn,
StorageEncryption,
Stream,
Expand Down Expand Up @@ -59,6 +60,9 @@ impl Display for Feature {
Feature::AggregateIndex => {
write!(f, "aggregate_index")
}
Feature::InvertedIndex => {
write!(f, "inverted_index")
}
Feature::ComputedColumn => {
write!(f, "computed_column")
}
Expand Down
14 changes: 14 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use databend_common_meta_app::schema::CreateIndexReply;
use databend_common_meta_app::schema::CreateIndexReq;
use databend_common_meta_app::schema::CreateLockRevReply;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::CreateTableIndexReply;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::CreateVirtualColumnReply;
Expand All @@ -38,6 +40,8 @@ use databend_common_meta_app::schema::DropDatabaseReq;
use databend_common_meta_app::schema::DropIndexReply;
use databend_common_meta_app::schema::DropIndexReq;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::DropTableIndexReply;
use databend_common_meta_app::schema::DropTableIndexReq;
use databend_common_meta_app::schema::DropTableReply;
use databend_common_meta_app::schema::DropVirtualColumnReply;
use databend_common_meta_app::schema::DropVirtualColumnReq;
Expand Down Expand Up @@ -239,6 +243,16 @@ pub trait SchemaApi: Send + Sync {
req: SetTableColumnMaskPolicyReq,
) -> Result<SetTableColumnMaskPolicyReply, KVAppError>;

async fn create_table_index(
&self,
req: CreateTableIndexReq,
) -> Result<CreateTableIndexReply, KVAppError>;

async fn drop_table_index(
&self,
req: DropTableIndexReq,
) -> Result<DropTableIndexReply, KVAppError>;

async fn get_drop_table_infos(
&self,
req: ListDroppedTableReq,
Expand Down
152 changes: 152 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ use databend_common_meta_app::schema::CreateIndexReq;
use databend_common_meta_app::schema::CreateLockRevReply;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReply;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::CreateVirtualColumnReply;
Expand All @@ -98,6 +100,8 @@ use databend_common_meta_app::schema::DropDatabaseReq;
use databend_common_meta_app::schema::DropIndexReply;
use databend_common_meta_app::schema::DropIndexReq;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::DropTableIndexReply;
use databend_common_meta_app::schema::DropTableIndexReq;
use databend_common_meta_app::schema::DropTableReply;
use databend_common_meta_app::schema::DropVirtualColumnReply;
use databend_common_meta_app::schema::DropVirtualColumnReq;
Expand Down Expand Up @@ -148,6 +152,7 @@ use databend_common_meta_app::schema::TableIdList;
use databend_common_meta_app::schema::TableIdListKey;
use databend_common_meta_app::schema::TableIdToName;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableIndex;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableInfoFilter;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -3131,6 +3136,153 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn create_table_index(
&self,
req: CreateTableIndexReq,
) -> Result<CreateTableIndexReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tbid = TableId {
table_id: req.table_id,
};

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let (tb_meta_seq, table_meta): (_, Option<TableMeta>) =
get_pb_value(self, &tbid).await?;

debug!(ident :% =(&tbid); "create_table_index");

if tb_meta_seq == 0 || table_meta.is_none() {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(req.table_id, "create_table_index"),
)));
}

let mut table_meta = table_meta.unwrap();
// update table indexes
let indexes = &mut table_meta.indexes;
if indexes.contains_key(&req.name) {
match req.create_option {
CreateOption::None => {
return Err(KVAppError::AppError(AppError::IndexAlreadyExists(
IndexAlreadyExists::new(&req.name, "create table index".to_string()),
)));
}
CreateOption::CreateIfNotExists => {
return Ok(CreateTableIndexReply {});
}
CreateOption::CreateOrReplace => {}
}
}
// check the index column id exists
for column_id in &req.column_ids {
if table_meta.schema.is_column_deleted(*column_id) {
return Err(KVAppError::AppError(AppError::UnknownIndex(
UnknownIndex::new(
&req.name,
format!("table index column id {} is not exist", column_id),
),
)));
}
}
let index = TableIndex {
name: req.name.clone(),
column_ids: req.column_ids.clone(),
};
indexes.insert(req.name.clone(), index);

let txn_req = TxnRequest {
condition: vec![
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
],
if_then: vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
id :? =(&tbid),
succ = succ;
"create_table_index"
);

if succ {
return Ok(CreateTableIndexReply {});
}
}
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn drop_table_index(
&self,
req: DropTableIndexReq,
) -> Result<DropTableIndexReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tbid = TableId {
table_id: req.table_id,
};

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let (tb_meta_seq, table_meta): (_, Option<TableMeta>) =
get_pb_value(self, &tbid).await?;

debug!(ident :% =(&tbid); "drop_table_index");

if tb_meta_seq == 0 || table_meta.is_none() {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(req.table_id, "drop_table_index"),
)));
}

let mut table_meta = table_meta.unwrap();
// update table indexes
let indexes = &mut table_meta.indexes;
if !indexes.contains_key(&req.name) && !req.if_exists {
return Err(KVAppError::AppError(AppError::UnknownIndex(
UnknownIndex::new(&req.name, "drop table index".to_string()),
)));
}
indexes.remove(&req.name);

let txn_req = TxnRequest {
condition: vec![
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
],
if_then: vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
id :? =(&tbid),
succ = succ;
"drop_table_index"
);

if succ {
return Ok(DropTableIndexReply {});
}
}
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn get_drop_table_infos(
Expand Down
Loading

0 comments on commit a9d1fac

Please sign in to comment.