Skip to content

Commit

Permalink
introduce buffer io
Browse files Browse the repository at this point in the history
Signed-off-by: LHT129 <[email protected]>
  • Loading branch information
LHT129 committed Jan 15, 2025
1 parent b6ae713 commit 4a703c8
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 9 deletions.
4 changes: 4 additions & 0 deletions include/vsag/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ extern const char* const HGRAPH_BUILD_EF_CONSTRUCTION;
extern const char* const HGRAPH_INIT_CAPACITY;
extern const char* const HGRAPH_BUILD_THREAD_COUNT;
extern const char* const HGRAPH_PRECISE_QUANTIZATION_TYPE;
extern const char* const HGRAPH_BASE_IO_TYPE;
extern const char* const HGRAPH_BASE_FILE_PATH;
extern const char* const HGRAPH_PRECISE_IO_TYPE;
extern const char* const HGRAPH_PRECISE_FILE_PATH;

extern const char* const BRUTE_FORCE_QUANTIZATION_TYPE;
extern const char* const BRUTE_FORCE_IO_TYPE;
Expand Down
5 changes: 5 additions & 0 deletions src/constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,9 @@ const char* const HGRAPH_PRECISE_QUANTIZATION_TYPE = "precise_quantization_type"
const char* const BRUTE_FORCE_QUANTIZATION_TYPE = "quantization_type";
const char* const BRUTE_FORCE_IO_TYPE = "io_type";

const char* const HGRAPH_BASE_IO_TYPE = "base_io_type";
const char* const HGRAPH_BASE_FILE_PATH = "base_file_path";
const char* const HGRAPH_PRECISE_IO_TYPE = "precise_io_type";
const char* const HGRAPH_PRECISE_FILE_PATH = "precise_file_path";

}; // namespace vsag
5 changes: 3 additions & 2 deletions src/data_cell/flatten_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ FlattenInterface::MakeInstance(const FlattenDataCellParamPtr& param,
auto io_type_name = param->io_parameter_->GetTypeName();
if (io_type_name == IO_TYPE_VALUE_BLOCK_MEMORY_IO) {
return make_instance<MemoryBlockIO>(param, common_param);
}
if (io_type_name == IO_TYPE_VALUE_MEMORY_IO) {
} else if (io_type_name == IO_TYPE_VALUE_MEMORY_IO) {
return make_instance<MemoryIO>(param, common_param);
} else if (io_type_name == IO_TYPE_VALUE_BUFFER_IO) {
return make_instance<BufferIO>(param, common_param);
}
return nullptr;
}
Expand Down
13 changes: 10 additions & 3 deletions src/index/hgraph_index_zparameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ static const std::unordered_map<std::string, std::vector<std::string>> EXTERNAL_
{HGRAPH_USE_REORDER, {HGRAPH_USE_REORDER_KEY}},
{HGRAPH_BASE_QUANTIZATION_TYPE,
{HGRAPH_BASE_CODES_KEY, QUANTIZATION_PARAMS_KEY, QUANTIZATION_TYPE_KEY}},
{HGRAPH_BASE_IO_TYPE, {HGRAPH_BASE_CODES_KEY, IO_PARAMS_KEY, IO_TYPE_KEY}},
{HGRAPH_PRECISE_IO_TYPE, {HGRAPH_PRECISE_CODES_KEY, IO_PARAMS_KEY, IO_TYPE_KEY}},
{HGRAPH_BASE_FILE_PATH, {HGRAPH_BASE_CODES_KEY, IO_PARAMS_KEY, IO_FILE_PATH}},
{HGRAPH_PRECISE_FILE_PATH, {HGRAPH_PRECISE_CODES_KEY, IO_PARAMS_KEY, IO_FILE_PATH}},
{HGRAPH_PRECISE_QUANTIZATION_TYPE,
{HGRAPH_PRECISE_CODES_KEY, QUANTIZATION_PARAMS_KEY, QUANTIZATION_TYPE_KEY}},
{HGRAPH_GRAPH_MAX_DEGREE, {HGRAPH_GRAPH_KEY, GRAPH_PARAM_MAX_DEGREE}},
Expand All @@ -44,14 +48,16 @@ static const std::string HGRAPH_PARAMS_TEMPLATE =
"{HGRAPH_USE_REORDER_KEY}": false,
"{HGRAPH_GRAPH_KEY}": {
"{IO_PARAMS_KEY}": {
"{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}"
"{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}",
"{IO_FILE_PATH}": "{DEFAULT_FILE_PATH_VALUE}"
},
"{GRAPH_PARAM_MAX_DEGREE}": 64,
"{GRAPH_PARAM_INIT_MAX_CAPACITY}": 100
},
"{HGRAPH_BASE_CODES_KEY}": {
"{IO_PARAMS_KEY}": {
"{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}"
"{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}",
"{IO_FILE_PATH}": "{DEFAULT_FILE_PATH_VALUE}"
},
"codes_type": "flatten_codes",
"{QUANTIZATION_PARAMS_KEY}": {
Expand All @@ -62,7 +68,8 @@ static const std::string HGRAPH_PARAMS_TEMPLATE =
},
"{HGRAPH_PRECISE_CODES_KEY}": {
"{IO_PARAMS_KEY}": {
"{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}"
"{IO_TYPE_KEY}": "{IO_TYPE_VALUE_BLOCK_MEMORY_IO}",
"{IO_FILE_PATH}": "{DEFAULT_FILE_PATH_VALUE}"
},
"codes_type": "flatten_codes",
"{QUANTIZATION_PARAMS_KEY}": {
Expand Down
6 changes: 6 additions & 0 deletions src/inner_string_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ const char* const IO_PARAMS_KEY = "io_params";
// IO type
const char* const IO_TYPE_KEY = "type";
const char* const IO_TYPE_VALUE_MEMORY_IO = "memory_io";
const char* const IO_TYPE_VALUE_BUFFER_IO = "buffer_io";
const char* const IO_TYPE_VALUE_BLOCK_MEMORY_IO = "block_memory_io";
const char* const BLOCK_IO_BLOCK_SIZE_KEY = "block_size";
const char* const IO_FILE_PATH = "file_path";
const char* const DEFAULT_FILE_PATH_VALUE = "/tmp/default_file_path";

// quantization params key
const char* const QUANTIZATION_PARAMS_KEY = "quantization_params";
Expand Down Expand Up @@ -76,6 +79,9 @@ const std::unordered_map<std::string, std::string> DEFAULT_MAP = {
{"BUILD_PARAMS_KEY", BUILD_PARAMS_KEY},
{"BUILD_THREAD_COUNT", BUILD_THREAD_COUNT},
{"BUILD_EF_CONSTRUCTION", BUILD_EF_CONSTRUCTION},
{"IO_TYPE_VALUE_BUFFER_IO", IO_TYPE_VALUE_BUFFER_IO},
{"IO_FILE_PATH", IO_FILE_PATH},
{"DEFAULT_FILE_PATH_VALUE", DEFAULT_FILE_PATH_VALUE},
};

} // namespace vsag
1 change: 1 addition & 0 deletions src/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ set (IO_SRC
io_parameter.cpp
memory_io_parameter.cpp
memory_block_io_parameter.cpp
buffer_io_parameter.cpp
)

add_library (io OBJECT ${IO_SRC})
133 changes: 133 additions & 0 deletions src/io/buffer_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <utility>

#include "basic_io.h"
#include "buffer_io_parameter.h"
#include "index/index_common_param.h"

namespace vsag {

class BufferIO : public BasicIO<BufferIO> {
public:
explicit BufferIO() = default;

BufferIO(std::string filename, Allocator* allocator)
: filepath_(std::move(filename)), allocator_(allocator) {
this->fd_ = open(filepath_.c_str(), O_CREAT | O_RDWR, 0644);
}

explicit BufferIO(const BufferIOParameterPtr& io_param, const IndexCommonParam& common_param)
: BufferIO(io_param->path_, common_param.allocator_.get()){};

explicit BufferIO(const IOParamPtr& param, const IndexCommonParam& common_param)
: BufferIO(std::dynamic_pointer_cast<BufferIOParameter>(param), common_param){};

~BufferIO() override = default;

inline void
WriteImpl(const uint8_t* data, uint64_t size, uint64_t offset) const {
auto ret = pwrite64(this->fd_, data, size, offset);
if (ret != size) {
throw std::runtime_error(fmt::format("write bytes {} less than {}", ret, size));
}
}

inline bool
ReadImpl(uint64_t size, uint64_t offset, uint8_t* data) const {
auto ret = pread64(this->fd_, data, size, offset);
if (ret != size) {
throw std::runtime_error(fmt::format("read bytes {} less than {}", ret, size));
}
return true;
}

[[nodiscard]] inline const uint8_t*
DirectReadImpl(uint64_t size, uint64_t offset, bool& need_release) const {
need_release = true;
auto* buf = reinterpret_cast<uint8_t*>(allocator_->Allocate(size));
ReadImpl(size, offset, buf);
return buf;
}

inline void
ReleaseImpl(const uint8_t* data) const {
auto ptr = const_cast<uint8_t*>(data);
allocator_->Deallocate(ptr);
}

inline bool
MultiReadImpl(uint8_t* datas, uint64_t* sizes, uint64_t* offsets, uint64_t count) const {
bool ret = true;
for (uint64_t i = 0; i < count; ++i) {
ret &= ReadImpl(sizes[i], offsets[i], datas);
datas += sizes[i];
}
return ret;
}

inline void
PrefetchImpl(uint64_t offset, uint64_t cache_line = 64){};

inline void
SerializeImpl(StreamWriter& writer) {
struct stat file_stat;
fstat(fd_, &file_stat);
StreamWriter::WriteObj(writer, file_stat.st_size);

auto block_size = Options::Instance().block_size_limit();
auto* buf = reinterpret_cast<uint8_t*>(allocator_->Allocate(block_size));
off64_t offset = 0;
auto read_bytes = pread64(this->fd_, buf, block_size, offset);
while (read_bytes > 0) {
writer.Write(reinterpret_cast<char*>(buf), read_bytes);
offset += read_bytes;
read_bytes = pread64(this->fd_, buf, block_size, offset);
}
this->allocator_->Deallocate(buf);
}

inline void
DeserializeImpl(StreamReader& reader) {
uint64_t file_size;
StreamReader::ReadObj(reader, file_size);
auto block_size = Options::Instance().block_size_limit();
auto* buf = reinterpret_cast<uint8_t*>(allocator_->Allocate(block_size));
off64_t offset = 0;
while (file_size > 0) {
auto read_bytes = std::min(block_size, file_size);
reader.Read(reinterpret_cast<char*>(buf), read_bytes);
auto ret = pwrite64(this->fd_, buf, read_bytes, offset);
if (ret != read_bytes) {
throw std::runtime_error(
fmt::format("write bytes {} less than {}", ret, read_bytes));
}
offset += read_bytes;
file_size -= read_bytes;
}
this->allocator_->Deallocate(buf);
}

private:
std::string filepath_{};

Allocator* allocator_{nullptr};

int fd_{-1};
};
} // namespace vsag
43 changes: 43 additions & 0 deletions src/io/buffer_io_parameter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "buffer_io_parameter.h"

#include "inner_string_params.h"

namespace vsag {

BufferIOParameter::BufferIOParameter() : IOParameter(IO_TYPE_VALUE_BUFFER_IO) {
}

BufferIOParameter::BufferIOParameter(const vsag::JsonType& json)
: IOParameter(IO_TYPE_VALUE_BUFFER_IO) {
this->FromJson(json);
}

void
BufferIOParameter::FromJson(const JsonType& json) {
CHECK_ARGUMENT(json.contains(IO_FILE_PATH), "miss file_path param in buffer io type");
this->path_ = json[IO_FILE_PATH];
}

JsonType
BufferIOParameter::ToJson() {
JsonType json;
json[IO_TYPE_KEY] = IO_TYPE_VALUE_BUFFER_IO;
json[IO_FILE_PATH] = this->path_;
return json;
}
} // namespace vsag
39 changes: 39 additions & 0 deletions src/io/buffer_io_parameter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "io_parameter.h"

namespace vsag {
class BufferIOParameter : public IOParameter {
public:
BufferIOParameter();

explicit BufferIOParameter(const JsonType& json);

void
FromJson(const JsonType& json) override;

JsonType
ToJson() override;

public:
std::string path_{};
};

using BufferIOParameterPtr = std::shared_ptr<BufferIOParameter>;

} // namespace vsag
60 changes: 60 additions & 0 deletions src/io/buffer_io_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "buffer_io.h"

#include <catch2/catch_test_macros.hpp>
#include <memory>

#include "basic_io_test.h"
#include "safe_allocator.h"

using namespace vsag;

TEST_CASE("read&write", "[ut][buffer_io]") {
fixtures::TempDir dir("buffer_io");
auto path = dir.GenerateRandomFile();
auto allocator = SafeAllocator::FactoryDefaultAllocator();
auto io = std::make_unique<BufferIO>(path, allocator.get());
TestBasicReadWrite(*io);
}

TEST_CASE("param", "[ut][buffer_io]") {
fixtures::TempDir dir("buffer_io");
auto path = dir.GenerateRandomFile();
auto allocator = SafeAllocator::FactoryDefaultAllocator();
std::string param_str = R"(
{{
"type": "buffer_io",
"file_path" : "{}"
}}
)";
auto json = JsonType::parse(fmt::format(param_str, path));
auto io_param = IOParameter::GetIOParameterByJson(json);
IndexCommonParam common_param;
common_param.allocator_ = allocator;
auto io = std::make_unique<BufferIO>(io_param, common_param);
TestBasicReadWrite(*io);
}

TEST_CASE("serialize&deserialize", "[ut][buffer_io]") {
auto allocator = SafeAllocator::FactoryDefaultAllocator();
fixtures::TempDir dir("buffer_io");
auto path1 = dir.GenerateRandomFile();
auto path2 = dir.GenerateRandomFile();
auto wio = std::make_unique<BufferIO>(path1, allocator.get());
auto rio = std::make_unique<BufferIO>(path2, allocator.get());
TestSerializeAndDeserialize(*wio, *rio);
}
1 change: 1 addition & 0 deletions src/io/io_headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
#pragma once

#include "basic_io.h"
#include "buffer_io.h"
#include "memory_block_io.h"
#include "memory_io.h"
Loading

0 comments on commit 4a703c8

Please sign in to comment.