Skip to content

Commit

Permalink
enhance: Load raw data while scalar index doesn't have raw data (#28888)
Browse files Browse the repository at this point in the history
issue: #28886

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 6, 2023
1 parent 6a86ac0 commit fb089cd
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 48 deletions.
3 changes: 3 additions & 0 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class IndexBase {
virtual BinarySet
UploadV2(const Config& config = {}) = 0;

virtual const bool
HasRawData() const = 0;

bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/ScalarIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class ScalarIndex : public IndexBase {
virtual const TargetBitmap
Query(const DatasetPtr& dataset);

virtual const bool
HasRawData() const override = 0;

virtual int64_t
Size() = 0;
};
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/ScalarIndexSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class ScalarIndexSort : public ScalarIndex<T> {
BinarySet
UploadV2(const Config& config = {}) override;

const bool
HasRawData() const override {
return true;
}

private:
bool
ShouldSkip(const T lower_value, const T upper_value, const OpType op);
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/StringIndexMarisa.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class StringIndexMarisa : public StringIndex {
BinarySet
UploadV2(const Config& config = {});

const bool
HasRawData() const override {
return true;
}

private:
void
fill_str_ids(size_t n, const std::string* values);
Expand Down
96 changes: 80 additions & 16 deletions internal/core/src/query/visitors/ExecExprVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2280,8 +2280,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<bool>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<bool>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand All @@ -2297,8 +2305,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int8_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int8_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand All @@ -2314,8 +2330,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int16_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int16_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand All @@ -2331,8 +2355,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int32_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int32_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand All @@ -2348,8 +2380,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int64_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int64_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand All @@ -2365,8 +2405,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<float>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<float>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand All @@ -2382,8 +2430,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<double>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<double>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand Down Expand Up @@ -2411,8 +2467,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
auto& indexing =
segment_.chunk_scalar_index<std::string>(field_id,
chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<std::string>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
Expand Down
69 changes: 42 additions & 27 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
auto data_type = field_meta.get_data_type();

// Don't allow raw data and index exist at the same time
AssertInfo(!get_bit(index_ready_bitset_, field_id),
"field data can't be loaded when indexing exists");
// AssertInfo(!get_bit(index_ready_bitset_, field_id),
// "field data can't be loaded when indexing exists");

std::shared_ptr<ColumnBase> column{};
if (datatype_is_variable(data_type)) {
Expand Down Expand Up @@ -1071,30 +1071,11 @@ SegmentSealedImpl::fill_with_empty(FieldId field_id, int64_t count) const {
}

std::unique_ptr<DataArray>
SegmentSealedImpl::bulk_subscript(FieldId field_id,
const int64_t* seg_offsets,
int64_t count) const {
auto& field_meta = schema_->operator[](field_id);
// if count == 0, return empty data array
if (count == 0) {
return fill_with_empty(field_id, count);
}

if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!datatype_is_vector(field_meta.get_data_type())) {
AssertInfo(num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto index = chunk_index_impl(field_id, 0);
return ReverseDataFromIndex(index, seg_offsets, count, field_meta);
}

return get_vector(field_id, seg_offsets, count);
}

Assert(get_bit(field_data_ready_bitset_, field_id));

// DO NOT directly access the column byh map like: `fields_.at(field_id)->Data()`,
SegmentSealedImpl::get_raw_data(FieldId field_id,
const FieldMeta& field_meta,
const int64_t* seg_offsets,
int64_t count) const {
// DO NOT directly access the column by map like: `fields_.at(field_id)->Data()`,
// we have to clone the shared pointer,
// to make sure it won't get released if segment released
auto column = fields_.at(field_id);
Expand Down Expand Up @@ -1235,10 +1216,39 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
field_meta.get_data_type()));
}
}

return ret;
}

std::unique_ptr<DataArray>
SegmentSealedImpl::bulk_subscript(FieldId field_id,
const int64_t* seg_offsets,
int64_t count) const {
auto& field_meta = schema_->operator[](field_id);
// if count == 0, return empty data array
if (count == 0) {
return fill_with_empty(field_id, count);
}

if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!datatype_is_vector(field_meta.get_data_type())) {
AssertInfo(num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto index = chunk_index_impl(field_id, 0);
if (index->HasRawData()) {
return ReverseDataFromIndex(
index, seg_offsets, count, field_meta);
}
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
}

Assert(get_bit(field_data_ready_bitset_, field_id));

return get_raw_data(field_id, field_meta, seg_offsets, count);
}

bool
SegmentSealedImpl::HasIndex(FieldId field_id) const {
std::shared_lock lck(mutex_);
Expand Down Expand Up @@ -1271,6 +1281,11 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const {
field_indexing->indexing_.get());
return vec_index->HasRawData();
}
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {
return scalar_index->second->HasRawData();
}
}
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ class SegmentSealedImpl : public SegmentSealed {
std::unique_ptr<DataArray>
fill_with_empty(FieldId field_id, int64_t count) const;

std::unique_ptr<DataArray>
get_raw_data(FieldId field_id,
const FieldMeta& field_meta,
const int64_t* seg_offsets,
int64_t count) const;

void
update_row_count(int64_t row_count) {
// if (row_count_opt_.has_value()) {
Expand Down
23 changes: 22 additions & 1 deletion internal/core/unittest/test_scalar_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ TYPED_TEST_P(TypedScalarIndexTest, Count) {
}
}

TYPED_TEST_P(TypedScalarIndexTest, HasRawData) {
using T = TypeParam;
auto dtype = milvus::GetDType<T>();
auto index_types = GetIndexTypes<T>();
for (const auto& index_type : index_types) {
milvus::index::CreateIndexInfo create_index_info;
create_index_info.field_type = milvus::DataType(dtype);
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenArr<T>(nb);
scalar_index->Build(nb, arr.data());
ASSERT_EQ(nb, scalar_index->Count());
ASSERT_TRUE(scalar_index->HasRawData());
}
}

TYPED_TEST_P(TypedScalarIndexTest, In) {
using T = TypeParam;
auto dtype = milvus::GetDType<T>();
Expand Down Expand Up @@ -200,7 +220,8 @@ REGISTER_TYPED_TEST_CASE_P(TypedScalarIndexTest,
NotIn,
Range,
Codec,
Reverse);
Reverse,
HasRawData);

INSTANTIATE_TYPED_TEST_CASE_P(ArithmeticCheck, TypedScalarIndexTest, ScalarT);

Expand Down
6 changes: 6 additions & 0 deletions internal/core/unittest/test_string_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ TEST_F(StringIndexMarisaTest, Build) {
index->Build(strs.size(), strs.data());
}

TEST_F(StringIndexMarisaTest, HasRawData) {
auto index = milvus::index::CreateStringIndexMarisa();
index->Build(nb, strs.data());
ASSERT_TRUE(index->HasRawData());
}

TEST_F(StringIndexMarisaTest, Count) {
auto index = milvus::index::CreateStringIndexMarisa();
index->Build(nb, strs.data());
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
}
}
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
loadFieldDataInfo.enableMmap(fieldID, mmapEnabled)

var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
Expand Down
Loading

0 comments on commit fb089cd

Please sign in to comment.