Skip to content

Commit

Permalink
feat(query): read write inverted index (databendlabs#14827)
Browse files Browse the repository at this point in the history
* feat(query): read write inverted index

* fix check

* fix check

* fix check

* fix fmt

* fix check

* Update src/query/ee/src/inverted_index/indexer.rs

Co-authored-by: Sky Fan <[email protected]>

---------

Co-authored-by: Bohu <[email protected]>
Co-authored-by: Sky Fan <[email protected]>
  • Loading branch information
3 people authored Mar 5, 2024
1 parent c3c40a2 commit d6b0a07
Show file tree
Hide file tree
Showing 29 changed files with 1,636 additions and 2 deletions.
267 changes: 266 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ paste = "1.0.9"
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = "0.21.1"
thiserror = { workspace = true }
tonic = { workspace = true }
7 changes: 6 additions & 1 deletion src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ build_exceptions! {
IllegalCloudControlMessageFormat(1703),

// Geometry errors.
GeometryError(1801)
GeometryError(1801),

// Tantivy errors.
TantivyError(1901),
TantivyOpenReadError(1902),
TantivyQueryParserError(1903)
}

// Meta service errors [2001, 3000].
Expand Down
18 changes: 18 additions & 0 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,24 @@ impl From<GeozeroError> for ErrorCode {
}
}

impl From<tantivy::TantivyError> for ErrorCode {
fn from(error: tantivy::TantivyError) -> Self {
ErrorCode::TantivyError(error.to_string())
}
}

impl From<tantivy::directory::error::OpenReadError> for ErrorCode {
fn from(error: tantivy::directory::error::OpenReadError) -> Self {
ErrorCode::TantivyOpenReadError(error.to_string())
}
}

impl From<tantivy::query::QueryParserError> for ErrorCode {
fn from(error: tantivy::query::QueryParserError) -> Self {
ErrorCode::TantivyQueryParserError(error.to_string())
}
}

// === prost error ===
impl From<prost::EncodeError> for ErrorCode {
fn from(error: prost::EncodeError) -> Self {
Expand Down
1 change: 1 addition & 0 deletions src/query/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ databend-common-users = { path = "../users" }
databend-enterprise-aggregating-index = { path = "../ee_features/aggregating_index" }
databend-enterprise-background-service = { path = "../ee_features/background_service" }
databend-enterprise-data-mask-feature = { path = "../ee_features/data_mask" }
databend-enterprise-inverted-index = { path = "../ee_features/inverted_index" }
databend-enterprise-storage-encryption = { path = "../ee_features/storage_encryption" }
databend-enterprise-stream-handler = { path = "../ee_features/stream_handler" }
databend-enterprise-vacuum-handler = { path = "../ee_features/vacuum_handler" }
Expand Down
2 changes: 2 additions & 0 deletions src/query/ee/src/enterprise_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_license::license_manager::LicenseManager;
use crate::aggregating_index::RealAggregatingIndexHandler;
use crate::background_service::RealBackgroundService;
use crate::data_mask::RealDatamaskHandler;
use crate::inverted_index::RealInvertedIndexHandler;
use crate::license::license_mgr::RealLicenseManager;
use crate::storage_encryption::RealStorageEncryptionHandler;
use crate::storages::fuse::operations::RealVacuumHandler;
Expand All @@ -37,6 +38,7 @@ impl EnterpriseServices {
RealBackgroundService::init(&cfg).await?;
RealVirtualColumnHandler::init()?;
RealStreamHandler::init()?;
RealInvertedIndexHandler::init()?;
Ok(())
}
}
110 changes: 110 additions & 0 deletions src/query/ee/src/inverted_index/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_storages_fuse::io::InvertedIndexWriter;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::ReadSettings;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::Location;

pub struct Indexer {}

impl Indexer {
pub(crate) fn new() -> Indexer {
Indexer {}
}

#[async_backtrace::framed]
pub(crate) async fn index(
&self,
fuse_table: &FuseTable,
ctx: Arc<dyn TableContext>,
schema: DataSchema,
segment_locs: Option<Vec<Location>>,
) -> Result<String> {
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
// no snapshot
return Ok("".to_string());
};
if schema.fields.is_empty() {
// no field for index
return Ok("".to_string());
}

let table_schema = &fuse_table.get_table_info().meta.schema;

// Collect field indices used by inverted index.
let mut field_indices = Vec::new();
for field in &schema.fields {
let field_index = table_schema.index_of(field.name())?;
field_indices.push(field_index);
}

let projection = Projection::Columns(field_indices);
let block_reader =
fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?;

let segment_reader =
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());

let settings = ReadSettings::from_ctx(&ctx)?;
let write_settings = fuse_table.get_write_settings();
let storage_format = write_settings.storage_format;

let operator = fuse_table.get_operator_ref();

// If no segment locations are specified, iterates through all segments
let segment_locs = if let Some(segment_locs) = segment_locs {
segment_locs
} else {
snapshot.segments.clone()
};

let mut index_writer = InvertedIndexWriter::try_create(schema)?;

for (location, ver) in segment_locs {
let segment_info = segment_reader
.read(&LoadParams {
location: location.to_string(),
len_hint: None,
ver,
put_cache: false,
})
.await?;

let block_metas = segment_info.block_metas()?;
for block_meta in block_metas {
let block = block_reader
.read_by_meta(&settings, &block_meta, &storage_format)
.await?;

index_writer.add_block(block)?;
}
}

let location_generator = fuse_table.meta_location_generator();

let index_location = index_writer.finalize(operator, location_generator).await?;
// TODO: add index location to meta
Ok(index_location)
}
}
52 changes: 52 additions & 0 deletions src/query/ee/src/inverted_index/inverted_index_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_inverted_index::InvertedIndexHandler;
use databend_enterprise_inverted_index::InvertedIndexHandlerWrapper;
use databend_storages_common_table_meta::meta::Location;

use super::indexer::Indexer;

pub struct RealInvertedIndexHandler {}

#[async_trait::async_trait]
impl InvertedIndexHandler for RealInvertedIndexHandler {
#[async_backtrace::framed]
async fn do_refresh_index(
&self,
fuse_table: &FuseTable,
ctx: Arc<dyn TableContext>,
schema: DataSchema,
segment_locs: Option<Vec<Location>>,
) -> Result<String> {
let indexer = Indexer::new();
indexer.index(fuse_table, ctx, schema, segment_locs).await
}
}

impl RealInvertedIndexHandler {
pub fn init() -> Result<()> {
let rm = RealInvertedIndexHandler {};
let wrapper = InvertedIndexHandlerWrapper::new(Box::new(rm));
GlobalInstance::set(Arc::new(wrapper));
Ok(())
}
}
17 changes: 17 additions & 0 deletions src/query/ee/src/inverted_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod indexer;
mod inverted_index_handler;
pub use inverted_index_handler::RealInvertedIndexHandler;
1 change: 1 addition & 0 deletions src/query/ee/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod aggregating_index;
pub mod background_service;
pub mod data_mask;
pub mod enterprise_services;
pub mod inverted_index;
pub mod license;
pub mod storage_encryption;
pub mod storages;
Expand Down
2 changes: 2 additions & 0 deletions src/query/ee/src/test_kits/mock_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_license::license_manager::LicenseManagerWrapper;

use crate::aggregating_index::RealAggregatingIndexHandler;
use crate::data_mask::RealDatamaskHandler;
use crate::inverted_index::RealInvertedIndexHandler;
use crate::license::RealLicenseManager;
use crate::storages::fuse::operations::RealVacuumHandler;
use crate::stream::RealStreamHandler;
Expand All @@ -40,6 +41,7 @@ impl MockServices {
RealDatamaskHandler::init()?;
RealVirtualColumnHandler::init()?;
RealStreamHandler::init()?;
RealInvertedIndexHandler::init()?;
Ok(())
}
}
74 changes: 74 additions & 0 deletions src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_storages_fuse::io::read::InvertedIndexReader;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_inverted_index::get_inverted_index_handler;
use databend_enterprise_query::test_kits::context::EESetup;
use databend_query::test_kits::append_string_sample_data;
use databend_query::test_kits::*;

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;

fixture
.default_session()
.get_settings()
.set_data_retention_time_in_days(0)?;
fixture.create_default_database().await?;
fixture.create_string_table().await?;

let number_of_block = 2;
append_string_sample_data(number_of_block, &fixture).await?;

let table = fixture.latest_default_table().await?;
let table_schema = table.schema();
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let dal = fuse_table.get_operator_ref();

let table_ctx = fixture.new_query_ctx().await?;
let schema = DataSchema::from(table_schema);

let handler = get_inverted_index_handler();
let location = handler
.do_refresh_index(fuse_table, table_ctx.clone(), schema.clone(), None)
.await?;

let index_reader = InvertedIndexReader::create(dal.clone(), schema);

let num = 5;
let query = "rust";
let docs = index_reader.do_read(location.clone(), query, num)?;
assert_eq!(docs.len(), 2);
assert_eq!(docs[0].1.doc_id, 0);
assert_eq!(docs[1].1.doc_id, 1);

let query = "java";
let docs = index_reader.do_read(location.clone(), query, num)?;
assert_eq!(docs.len(), 1);
assert_eq!(docs[0].1.doc_id, 2);

let query = "data";
let docs = index_reader.do_read(location, query, num)?;
assert_eq!(docs.len(), 3);
assert_eq!(docs[0].1.doc_id, 4);
assert_eq!(docs[1].1.doc_id, 1);
assert_eq!(docs[2].1.doc_id, 5);

Ok(())
}
15 changes: 15 additions & 0 deletions src/query/ee/tests/it/inverted_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod index_refresh;
1 change: 1 addition & 0 deletions src/query/ee/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#![feature(unwrap_infallible)]
mod aggregating_index;
mod background_service;
mod inverted_index;
mod license;
mod storages;
Loading

0 comments on commit d6b0a07

Please sign in to comment.