diff --git a/include/vsag/constants.h b/include/vsag/constants.h index 055e9e7c..e5a461f4 100644 --- a/include/vsag/constants.h +++ b/include/vsag/constants.h @@ -109,6 +109,10 @@ extern const char* const HGRAPH_BUILD_EF_CONSTRUCTION; extern const char* const HGRAPH_INIT_CAPACITY; extern const char* const HGRAPH_BUILD_THREAD_COUNT; extern const char* const HGRAPH_PRECISE_QUANTIZATION_TYPE; +extern const char* const HGRAPH_BASE_IO_TYPE; +extern const char* const HGRAPH_BASE_FILE_PATH; +extern const char* const HGRAPH_PRECISE_IO_TYPE; +extern const char* const HGRAPH_PRECISE_FILE_PATH; extern const char* const BRUTE_FORCE_QUANTIZATION_TYPE; extern const char* const BRUTE_FORCE_IO_TYPE; diff --git a/src/constants.cpp b/src/constants.cpp index f0220f11..8728befa 100644 --- a/src/constants.cpp +++ b/src/constants.cpp @@ -115,4 +115,9 @@ const char* const HGRAPH_PRECISE_QUANTIZATION_TYPE = "precise_quantization_type" const char* const BRUTE_FORCE_QUANTIZATION_TYPE = "quantization_type"; const char* const BRUTE_FORCE_IO_TYPE = "io_type"; +const char* const HGRAPH_BASE_IO_TYPE = "base_io_type"; +const char* const HGRAPH_BASE_FILE_PATH = "base_file_path"; +const char* const HGRAPH_PRECISE_IO_TYPE = "precise_io_type"; +const char* const HGRAPH_PRECISE_FILE_PATH = "precise_file_path"; + }; // namespace vsag diff --git a/src/data_cell/flatten_interface.cpp b/src/data_cell/flatten_interface.cpp index 0d430cf8..d49bf867 100644 --- a/src/data_cell/flatten_interface.cpp +++ b/src/data_cell/flatten_interface.cpp @@ -73,9 +73,10 @@ FlattenInterface::MakeInstance(const FlattenDataCellParamPtr& param, auto io_type_name = param->io_parameter_->GetTypeName(); if (io_type_name == IO_TYPE_VALUE_BLOCK_MEMORY_IO) { return make_instance(param, common_param); - } - if (io_type_name == IO_TYPE_VALUE_MEMORY_IO) { + } else if (io_type_name == IO_TYPE_VALUE_MEMORY_IO) { return make_instance(param, common_param); + } else if (io_type_name == IO_TYPE_VALUE_BUFFER_IO) { + return make_instance(param, common_param); } return nullptr; } diff --git a/src/index/hgraph_index_zparameters.cpp b/src/index/hgraph_index_zparameters.cpp index fd1064c0..fb490721 100644 --- a/src/index/hgraph_index_zparameters.cpp +++ b/src/index/hgraph_index_zparameters.cpp @@ -30,6 +30,10 @@ static const std::unordered_map> EXTERNAL_ {HGRAPH_USE_REORDER, {HGRAPH_USE_REORDER_KEY}}, {HGRAPH_BASE_QUANTIZATION_TYPE, {HGRAPH_BASE_CODES_KEY, QUANTIZATION_PARAMS_KEY, QUANTIZATION_TYPE_KEY}}, + {HGRAPH_BASE_IO_TYPE, {HGRAPH_BASE_CODES_KEY, IO_PARAMS_KEY, IO_TYPE_KEY}}, + {HGRAPH_PRECISE_IO_TYPE, {HGRAPH_PRECISE_CODES_KEY, IO_PARAMS_KEY, IO_TYPE_KEY}}, + {HGRAPH_BASE_FILE_PATH, {HGRAPH_BASE_CODES_KEY, IO_PARAMS_KEY, IO_FILE_PATH}}, + {HGRAPH_PRECISE_FILE_PATH, {HGRAPH_PRECISE_CODES_KEY, IO_PARAMS_KEY, IO_FILE_PATH}}, {HGRAPH_PRECISE_QUANTIZATION_TYPE, {HGRAPH_PRECISE_CODES_KEY, QUANTIZATION_PARAMS_KEY, QUANTIZATION_TYPE_KEY}}, {HGRAPH_GRAPH_MAX_DEGREE, {HGRAPH_GRAPH_KEY, GRAPH_PARAM_MAX_DEGREE}}, @@ -44,14 +48,16 @@ static const std::string HGRAPH_PARAMS_TEMPLATE = "{HGRAPH_USE_REORDER_KEY}": false, "{HGRAPH_GRAPH_KEY}": { "{IO_PARAMS_KEY}": { - "{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}" + "{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}", + "{IO_FILE_PATH}": "{DEFAULT_FILE_PATH_VALUE}" }, "{GRAPH_PARAM_MAX_DEGREE}": 64, "{GRAPH_PARAM_INIT_MAX_CAPACITY}": 100 }, "{HGRAPH_BASE_CODES_KEY}": { "{IO_PARAMS_KEY}": { - "{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}" + "{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}", + "{IO_FILE_PATH}": "{DEFAULT_FILE_PATH_VALUE}" }, "codes_type": "flatten_codes", "{QUANTIZATION_PARAMS_KEY}": { @@ -62,7 +68,8 @@ static const std::string HGRAPH_PARAMS_TEMPLATE = }, "{HGRAPH_PRECISE_CODES_KEY}": { "{IO_PARAMS_KEY}": { - "{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}" + "{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}", + "{IO_FILE_PATH}": "{DEFAULT_FILE_PATH_VALUE}" }, "codes_type": "flatten_codes", "{QUANTIZATION_PARAMS_KEY}": { diff --git a/src/inner_string_params.h b/src/inner_string_params.h index feda3410..b4bd9f22 100644 --- a/src/inner_string_params.h +++ b/src/inner_string_params.h @@ -33,8 +33,11 @@ const char* const IO_PARAMS_KEY = "io_params"; // IO type const char* const IO_TYPE_KEY = "type"; const char* const IO_TYPE_VALUE_MEMORY_IO = "memory_io"; +const char* const IO_TYPE_VALUE_BUFFER_IO = "buffer_io"; const char* const IO_TYPE_VALUE_BLOCK_MEMORY_IO = "block_memory_io"; const char* const BLOCK_IO_BLOCK_SIZE_KEY = "block_size"; +const char* const IO_FILE_PATH = "file_path"; +const char* const DEFAULT_FILE_PATH_VALUE = "/tmp/default_file_path"; // quantization params key const char* const QUANTIZATION_PARAMS_KEY = "quantization_params"; @@ -76,6 +79,9 @@ const std::unordered_map DEFAULT_MAP = { {"BUILD_PARAMS_KEY", BUILD_PARAMS_KEY}, {"BUILD_THREAD_COUNT", BUILD_THREAD_COUNT}, {"BUILD_EF_CONSTRUCTION", BUILD_EF_CONSTRUCTION}, + {"IO_TYPE_VALUE_BUFFER_IO", IO_TYPE_VALUE_BUFFER_IO}, + {"IO_FILE_PATH", IO_FILE_PATH}, + {"DEFAULT_FILE_PATH_VALUE", DEFAULT_FILE_PATH_VALUE}, }; } // namespace vsag diff --git a/src/io/CMakeLists.txt b/src/io/CMakeLists.txt index 2cceecb0..7d9957ca 100644 --- a/src/io/CMakeLists.txt +++ b/src/io/CMakeLists.txt @@ -3,6 +3,7 @@ set (IO_SRC io_parameter.cpp memory_io_parameter.cpp memory_block_io_parameter.cpp + buffer_io_parameter.cpp ) add_library (io OBJECT ${IO_SRC}) diff --git a/src/io/buffer_io.h b/src/io/buffer_io.h new file mode 100644 index 00000000..7fb3f559 --- /dev/null +++ b/src/io/buffer_io.h @@ -0,0 +1,133 @@ + +// Copyright 2024-present the vsag project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "basic_io.h" +#include "buffer_io_parameter.h" +#include "index/index_common_param.h" + +namespace vsag { + +class BufferIO : public BasicIO { +public: + explicit BufferIO() = default; + + BufferIO(std::string filename, Allocator* allocator) + : filepath_(std::move(filename)), allocator_(allocator) { + this->fd_ = open(filepath_.c_str(), O_CREAT | O_RDWR, 0644); + } + + explicit BufferIO(const BufferIOParameterPtr& io_param, const IndexCommonParam& common_param) + : BufferIO(io_param->path_, common_param.allocator_.get()){}; + + explicit BufferIO(const IOParamPtr& param, const IndexCommonParam& common_param) + : BufferIO(std::dynamic_pointer_cast(param), common_param){}; + + ~BufferIO() override = default; + + inline void + WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset) const { + auto ret = pwrite64(this->fd_, data, size, offset); + if (ret != size) { + throw std::runtime_error(fmt::format("write bytes {} less than {}", ret, size)); + } + } + + inline bool + ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const { + auto ret = pread64(this->fd_, data, size, offset); + if (ret != size) { + throw std::runtime_error(fmt::format("read bytes {} less than {}", ret, size)); + } + return true; + } + + [[nodiscard]] inline const uint8_t* + DirectReadImpl(uint64_t size, uint64_t offset, bool& need_release) const { + need_release = true; + auto* buf = reinterpret_cast(allocator_->Allocate(size)); + ReadImpl(size, offset, buf); + return buf; + } + + inline void + ReleaseImpl(const uint8_t* data) const { + auto ptr = const_cast(data); + allocator_->Deallocate(ptr); + } + + inline bool + MultiReadImpl(uint8_t* datas, uint64_t* sizes, uint64_t* offsets, uint64_t count) const { + bool ret = true; + for (uint64_t i = 0; i < count; ++i) { + ret &= ReadImpl(sizes[i], offsets[i], datas); + datas += sizes[i]; + } + return ret; + } + + inline void + PrefetchImpl(uint64_t offset, uint64_t cache_line = 64){}; + + inline void + SerializeImpl(StreamWriter& writer) { + struct stat file_stat; + fstat(fd_, &file_stat); + StreamWriter::WriteObj(writer, file_stat.st_size); + + auto block_size = Options::Instance().block_size_limit(); + auto* buf = reinterpret_cast(allocator_->Allocate(block_size)); + off64_t offset = 0; + auto read_bytes = pread64(this->fd_, buf, block_size, offset); + while (read_bytes > 0) { + writer.Write(reinterpret_cast(buf), read_bytes); + offset += read_bytes; + read_bytes = pread64(this->fd_, buf, block_size, offset); + } + this->allocator_->Deallocate(buf); + } + + inline void + DeserializeImpl(StreamReader& reader) { + uint64_t file_size; + StreamReader::ReadObj(reader, file_size); + auto block_size = Options::Instance().block_size_limit(); + auto* buf = reinterpret_cast(allocator_->Allocate(block_size)); + off64_t offset = 0; + while (file_size > 0) { + auto read_bytes = std::min(block_size, file_size); + reader.Read(reinterpret_cast(buf), read_bytes); + auto ret = pwrite64(this->fd_, buf, read_bytes, offset); + if (ret != read_bytes) { + throw std::runtime_error( + fmt::format("write bytes {} less than {}", ret, read_bytes)); + } + offset += read_bytes; + file_size -= read_bytes; + } + this->allocator_->Deallocate(buf); + } + +private: + std::string filepath_{}; + + Allocator* allocator_{nullptr}; + + int fd_{-1}; +}; +} // namespace vsag diff --git a/src/io/buffer_io_parameter.cpp b/src/io/buffer_io_parameter.cpp new file mode 100644 index 00000000..84d2c5fd --- /dev/null +++ b/src/io/buffer_io_parameter.cpp @@ -0,0 +1,43 @@ + +// Copyright 2024-present the vsag project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "buffer_io_parameter.h" + +#include "inner_string_params.h" + +namespace vsag { + +BufferIOParameter::BufferIOParameter() : IOParameter(IO_TYPE_VALUE_BUFFER_IO) { +} + +BufferIOParameter::BufferIOParameter(const vsag::JsonType& json) + : IOParameter(IO_TYPE_VALUE_BUFFER_IO) { + this->FromJson(json); +} + +void +BufferIOParameter::FromJson(const JsonType& json) { + CHECK_ARGUMENT(json.contains(IO_FILE_PATH), "miss file_path param in buffer io type"); + this->path_ = json[IO_FILE_PATH]; +} + +JsonType +BufferIOParameter::ToJson() { + JsonType json; + json[IO_TYPE_KEY] = IO_TYPE_VALUE_BUFFER_IO; + json[IO_FILE_PATH] = this->path_; + return json; +} +} // namespace vsag diff --git a/src/io/buffer_io_parameter.h b/src/io/buffer_io_parameter.h new file mode 100644 index 00000000..c2994862 --- /dev/null +++ b/src/io/buffer_io_parameter.h @@ -0,0 +1,39 @@ + +// Copyright 2024-present the vsag project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "io_parameter.h" + +namespace vsag { +class BufferIOParameter : public IOParameter { +public: + BufferIOParameter(); + + explicit BufferIOParameter(const JsonType& json); + + void + FromJson(const JsonType& json) override; + + JsonType + ToJson() override; + +public: + std::string path_{}; +}; + +using BufferIOParameterPtr = std::shared_ptr; + +} // namespace vsag diff --git a/src/io/buffer_io_test.cpp b/src/io/buffer_io_test.cpp new file mode 100644 index 00000000..d7ce60f2 --- /dev/null +++ b/src/io/buffer_io_test.cpp @@ -0,0 +1,60 @@ + +// Copyright 2024-present the vsag project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "buffer_io.h" + +#include +#include + +#include "basic_io_test.h" +#include "safe_allocator.h" + +using namespace vsag; + +TEST_CASE("read&write", "[ut][buffer_io]") { + fixtures::TempDir dir("buffer_io"); + auto path = dir.GenerateRandomFile(); + auto allocator = SafeAllocator::FactoryDefaultAllocator(); + auto io = std::make_unique(path, allocator.get()); + TestBasicReadWrite(*io); +} + +TEST_CASE("param", "[ut][buffer_io]") { + fixtures::TempDir dir("buffer_io"); + auto path = dir.GenerateRandomFile(); + auto allocator = SafeAllocator::FactoryDefaultAllocator(); + std::string param_str = R"( + {{ + "type": "buffer_io", + "file_path" : "{}" + }} + )"; + auto json = JsonType::parse(fmt::format(param_str, path)); + auto io_param = IOParameter::GetIOParameterByJson(json); + IndexCommonParam common_param; + common_param.allocator_ = allocator; + auto io = std::make_unique(io_param, common_param); + TestBasicReadWrite(*io); +} + +TEST_CASE("serialize&deserialize", "[ut][buffer_io]") { + auto allocator = SafeAllocator::FactoryDefaultAllocator(); + fixtures::TempDir dir("buffer_io"); + auto path1 = dir.GenerateRandomFile(); + auto path2 = dir.GenerateRandomFile(); + auto wio = std::make_unique(path1, allocator.get()); + auto rio = std::make_unique(path2, allocator.get()); + TestSerializeAndDeserialize(*wio, *rio); +} diff --git a/src/io/io_headers.h b/src/io/io_headers.h index 4639735a..4bc0b3c7 100644 --- a/src/io/io_headers.h +++ b/src/io/io_headers.h @@ -16,5 +16,6 @@ #pragma once #include "basic_io.h" +#include "buffer_io.h" #include "memory_block_io.h" #include "memory_io.h" diff --git a/src/io/io_parameter.cpp b/src/io/io_parameter.cpp index eaadd467..83751989 100644 --- a/src/io/io_parameter.cpp +++ b/src/io/io_parameter.cpp @@ -15,6 +15,7 @@ #include "io_parameter.h" +#include "buffer_io_parameter.h" #include "inner_string_params.h" #include "memory_block_io_parameter.h" #include "memory_io_parameter.h" @@ -32,6 +33,9 @@ IOParameter::GetIOParameterByJson(const JsonType& json) { } else if (type_name == IO_TYPE_VALUE_BLOCK_MEMORY_IO) { io_ptr = std::make_shared(); io_ptr->FromJson(json); + } else if (type_name == IO_TYPE_VALUE_BUFFER_IO) { + io_ptr = std::make_shared(); + io_ptr->FromJson(json); } } catch (std::invalid_argument& error) { return nullptr; diff --git a/tests/test_hgraph.cpp b/tests/test_hgraph.cpp index e3aea77f..2e664933 100644 --- a/tests/test_hgraph.cpp +++ b/tests/test_hgraph.cpp @@ -45,7 +45,11 @@ class HgraphTestIndex : public fixtures::TestIndex { }})"; const std::vector> test_cases = { - {"sq8_uniform,fp32", 0.98}, {"sq8", 0.96}, {"fp32", 0.99}, {"sq8_uniform", 0.95}}; + {"sq8_uniform,fp32,buffer_io", 0.98}, + {"sq8_uniform,fp32", 0.98}, + {"sq8", 0.96}, + {"fp32", 0.99}, + {"sq8_uniform", 0.95}}; }; TestDatasetPool HgraphTestIndex::pool{}; @@ -69,7 +73,8 @@ HgraphTestIndex::GenerateHGraphBuildParametersString(const std::string& metric_t "max_degree": 96, "ef_construction": 500, "build_thread_count": {}, - "precise_quantization_type": "{}" + "precise_quantization_type": "{}", + "precise_io_type": "{}" }} }} )"; @@ -89,17 +94,21 @@ HgraphTestIndex::GenerateHGraphBuildParametersString(const std::string& metric_t )"; auto strs = fixtures::SplitString(quantization_str, ','); - std::string high_quantizer_str; + std::string high_quantizer_str, precise_io_type = "block_memory_io"; auto& base_quantizer_str = strs[0]; if (strs.size() > 1) { high_quantizer_str = strs[1]; + if (strs.size() > 2) { + precise_io_type = strs[2]; + } build_parameters_str = fmt::format(parameter_temp_reorder, metric_type, dim, true, /* reorder */ base_quantizer_str, thread_count, - high_quantizer_str); + high_quantizer_str, + precise_io_type); } else { build_parameters_str = fmt::format(parameter_temp_origin, metric_type, dim, base_quantizer_str, thread_count);