Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: pass partition key scalar info if enable for vector mem index #39123

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/core/src/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ struct TypeTraits<DataType::INT32> {

template <>
struct TypeTraits<DataType::INT64> {
using NativeType = int32_t;
using NativeType = int64_t;
static constexpr DataType TypeKind = DataType::INT64;
static constexpr bool IsPrimitiveType = true;
static constexpr bool IsFixedWidth = true;
Expand Down
13 changes: 13 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@
auto field_datas =
file_manager_->CacheRawDataToMemory(insert_files.value());

auto opt_fields = GetValueFromConfig<OptFieldT>(config, VEC_OPT_FIELDS);
std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>> scalar_info;
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported() &&
config.value("partition_key_isolation", false)) {
scalar_info = file_manager_->CacheOptFieldToMemory(opt_fields.value());

Check warning on line 292 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L288-L292

Added lines #L288 - L292 were not covered by tests
}

Config build_config;
build_config.update(config);
build_config.erase("insert_files");
Expand Down Expand Up @@ -312,6 +319,9 @@
field_datas.clear();

auto dataset = GenDataset(total_num_rows, dim, buf.get());
if (!scalar_info.empty()) {
dataset->Set(knowhere::meta::SCALAR_INFO, std::move(scalar_info));

Check warning on line 323 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L322-L323

Added lines #L322 - L323 were not covered by tests
}
BuildWithDataset(dataset, build_config);
} else {
// sparse
Expand Down Expand Up @@ -342,6 +352,9 @@
}
auto dataset = GenDataset(total_rows, dim, vec.data());
dataset->SetIsSparse(true);
if (!scalar_info.empty()) {
dataset->Set(knowhere::meta::SCALAR_INFO, std::move(scalar_info));

Check warning on line 356 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L355-L356

Added lines #L355 - L356 were not covered by tests
}
BuildWithDataset(dataset, build_config);
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/core/src/query/PlanProto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@
// currently, iterative filter does not support range search
if (!search_info.search_params_.contains(RADIUS)) {
if (query_info_proto.hints() != "") {
if (query_info_proto.hints() == ITERATIVE_FILTER) {
if (query_info_proto.hints() == "disable") {
search_info.iterative_filter_execution = false;

Check warning on line 60 in internal/core/src/query/PlanProto.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/query/PlanProto.cpp#L60

Added line #L60 was not covered by tests
} else if (query_info_proto.hints() == ITERATIVE_FILTER) {
search_info.iterative_filter_execution = true;
} else {
// check if hints is valid
PanicInfo(ConfigInvalid,
"hints: {} not supported",
query_info_proto.hints());
}
}
if (!search_info.iterative_filter_execution &&
search_info.search_params_.contains(HINTS)) {
} else if (search_info.search_params_.contains(HINTS)) {
if (search_info.search_params_[HINTS] == ITERATIVE_FILTER) {
search_info.iterative_filter_execution = true;
} else {
Expand Down
52 changes: 3 additions & 49 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,16 +359,6 @@ DiskFileManagerImpl::CacheTextLogToDisk(
}
}

void
SortByPath(std::vector<std::string>& paths) {
std::sort(paths.begin(),
paths.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
}

template <typename DataType>
std::string
DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
Expand Down Expand Up @@ -480,20 +470,6 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
return local_data_path;
}

template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>
struct has_native_type<T, std::void_t<typename T::NativeType>>
: std::true_type {};
template <DataType T>
using DataTypeNativeOrVoid =
typename std::conditional<has_native_type<TypeTraits<T>>::value,
typename TypeTraits<T>::NativeType,
void>::type;
template <DataType T>
using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;

template <DataType T>
bool
WriteOptFieldIvfDataImpl(
Expand All @@ -515,7 +491,7 @@ WriteOptFieldIvfDataImpl(
}

// Do not write to disk if there is only one value
if (mp.size() == 1) {
if (mp.size() <= 1) {
return false;
}

Expand Down Expand Up @@ -626,23 +602,10 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
local_chunk_manager->CreateFile(local_data_path);

std::vector<FieldDataPtr> field_datas;
std::vector<std::string> batch_files;
uint64_t write_offset = 0;
WriteOptFieldsIvfMeta(
local_chunk_manager, local_data_path, num_of_fields, write_offset);

auto FetchRawData = [&]() {
auto fds = GetObjectData(rcm_.get(), batch_files);
for (size_t i = 0; i < batch_files.size(); ++i) {
auto data = fds[i].get()->GetFieldData();
field_datas.emplace_back(data);
}
};

auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::unordered_set<int64_t> actual_field_ids;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
Expand All @@ -652,19 +615,10 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
return "";
}

std::vector<FieldDataPtr>().swap(field_datas);
SortByPath(field_paths);
std::vector<FieldDataPtr> field_datas =
FetchFieldData(rcm_.get(), field_paths);

for (auto& file : field_paths) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
}
if (batch_files.size() > 0) {
FetchRawData();
}
if (WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,
Expand Down
91 changes: 85 additions & 6 deletions internal/core/src/storage/MemFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@
std::vector<FieldDataPtr>
MemFileManagerImpl::CacheRawDataToMemory(
std::vector<std::string> remote_files) {
std::sort(remote_files.begin(),
remote_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
SortByPath(remote_files);

auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
Expand Down Expand Up @@ -161,6 +156,90 @@
return field_datas;
}

template <DataType T>
std::vector<std::vector<uint32_t>>
GetOptFieldIvfDataImpl(const std::vector<FieldDataPtr>& field_datas) {

Check warning on line 161 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L161

Added line #L161 was not covered by tests
using FieldDataT = DataTypeNativeOrVoid<T>;
std::unordered_map<FieldDataT, std::vector<uint32_t>> mp;

Check warning on line 163 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L163

Added line #L163 was not covered by tests
uint32_t offset = 0;
for (const auto& field_data : field_datas) {
for (int64_t i = 0; i < field_data->get_num_rows(); ++i) {
auto val =
*reinterpret_cast<const FieldDataT*>(field_data->RawValue(i));
mp[val].push_back(offset++);

Check warning on line 169 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L165-L169

Added lines #L165 - L169 were not covered by tests
}
}

// opt field data is not used if there is only one value
if (mp.size() <= 1) {
return {};

Check warning on line 175 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L174-L175

Added lines #L174 - L175 were not covered by tests
}
std::vector<std::vector<uint32_t>> scalar_info;
scalar_info.reserve(mp.size());
for (auto& [field_id, tup] : mp) {
scalar_info.emplace_back(std::move(tup));

Check warning on line 180 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L177-L180

Added lines #L177 - L180 were not covered by tests
}
LOG_INFO("Get opt fields with {} categories", scalar_info.size());

Check warning on line 182 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L182

Added line #L182 was not covered by tests
return scalar_info;
}

Check warning on line 184 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L184

Added line #L184 was not covered by tests

std::vector<std::vector<uint32_t>>
GetOptFieldIvfData(const DataType& dt,

Check warning on line 187 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L187

Added line #L187 was not covered by tests
const std::vector<FieldDataPtr>& field_datas) {
switch (dt) {
case DataType::BOOL:
return GetOptFieldIvfDataImpl<DataType::BOOL>(field_datas);
case DataType::INT8:
return GetOptFieldIvfDataImpl<DataType::INT8>(field_datas);
case DataType::INT16:
return GetOptFieldIvfDataImpl<DataType::INT16>(field_datas);
case DataType::INT32:
return GetOptFieldIvfDataImpl<DataType::INT32>(field_datas);
case DataType::INT64:
return GetOptFieldIvfDataImpl<DataType::INT64>(field_datas);
case DataType::FLOAT:
return GetOptFieldIvfDataImpl<DataType::FLOAT>(field_datas);
case DataType::DOUBLE:
return GetOptFieldIvfDataImpl<DataType::DOUBLE>(field_datas);
case DataType::STRING:
return GetOptFieldIvfDataImpl<DataType::STRING>(field_datas);
case DataType::VARCHAR:
return GetOptFieldIvfDataImpl<DataType::VARCHAR>(field_datas);
default:
LOG_WARN("Unsupported data type in optional scalar field: ", dt);
return {};

Check warning on line 210 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L189-L210

Added lines #L189 - L210 were not covered by tests
}
return {};
}

std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>>
MemFileManagerImpl::CacheOptFieldToMemory(OptFieldT& fields_map) {
const uint32_t num_of_fields = fields_map.size();
if (0 == num_of_fields) {
return {};
} else if (num_of_fields > 1) {
PanicInfo(

Check warning on line 221 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L216-L221

Added lines #L216 - L221 were not covered by tests
ErrorCode::NotImplemented,
"vector index build with multiple fields is not supported yet");
}

std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>> res;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
auto& field_paths = std::get<2>(tup);
if (0 == field_paths.size()) {
LOG_WARN("optional field {} has no data", field_id);
return {};

Check warning on line 232 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L226-L232

Added lines #L226 - L232 were not covered by tests
}

SortByPath(field_paths);
std::vector<FieldDataPtr> field_datas =
FetchFieldData(rcm_.get(), field_paths);
res[field_id] = GetOptFieldIvfData(field_type, field_datas);
}
return res;

Check warning on line 240 in internal/core/src/storage/MemFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/MemFileManagerImpl.cpp#L235-L240

Added lines #L235 - L240 were not covered by tests
}

std::optional<bool>
MemFileManagerImpl::IsExisted(const std::string& filename) noexcept {
// TODO: implement this interface
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/storage/MemFileManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <vector>
#include <memory>
#include <unordered_map>

#include "storage/IndexData.h"
#include "storage/FileManager.h"
Expand Down Expand Up @@ -69,6 +70,9 @@ class MemFileManagerImpl : public FileManagerImpl {
return added_total_mem_size_;
}

std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>>
CacheOptFieldToMemory(OptFieldT& fields_map);

private:
// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;
Expand Down
28 changes: 28 additions & 0 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,4 +865,32 @@
return merged_data;
}

std::vector<FieldDataPtr>
FetchFieldData(ChunkManager* cm, const std::vector<std::string>& remote_files) {
std::vector<FieldDataPtr> field_datas;
std::vector<std::string> batch_files;
auto FetchRawData = [&]() {
auto fds = GetObjectData(cm, batch_files);
for (size_t i = 0; i < batch_files.size(); ++i) {
auto data = fds[i].get()->GetFieldData();
field_datas.emplace_back(data);
}
};

auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);

for (auto& file : remote_files) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();

Check warning on line 886 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L885-L886

Added lines #L885 - L886 were not covered by tests
}
batch_files.emplace_back(file);
}
if (batch_files.size() > 0) {
FetchRawData();
}
return field_datas;
}

} // namespace milvus::storage
27 changes: 27 additions & 0 deletions internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,31 @@
FieldDataPtr
MergeFieldData(std::vector<FieldDataPtr>& data_array);

template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>
struct has_native_type<T, std::void_t<typename T::NativeType>>
: std::true_type {};
template <DataType T>
using DataTypeNativeOrVoid =
typename std::conditional<has_native_type<TypeTraits<T>>::value,
typename TypeTraits<T>::NativeType,
void>::type;
template <DataType T>
using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;

std::vector<FieldDataPtr>
FetchFieldData(ChunkManager* cm, const std::vector<std::string>& batch_files);

inline void
SortByPath(std::vector<std::string>& paths) {
std::sort(paths.begin(),
paths.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));

Check warning on line 189 in internal/core/src/storage/Util.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.h#L187-L189

Added lines #L187 - L189 were not covered by tests
});
}

} // namespace milvus::storage
2 changes: 1 addition & 1 deletion internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err
}

hasPartitionKey := hasParitionKeyModeField(t.schema)
hasPartitionKey := hasPartitionKeyModeField(t.schema)
if _, err := validatePartitionKeyIsolation(ctx, t.CollectionName, hasPartitionKey, t.GetProperties()...); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask, plan *pl
if err != nil {
return err
}
// force set hints to disable
queryInfo.Hints = "disable"
}
queryInfo.MaterializedViewInvolved = true
} else {
Expand Down
Loading
Loading