Skip to content

Commit

Permalink
Logs extension (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
alonre24 authored Jul 25, 2023
1 parent cc14a65 commit 3f6360b
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 19 deletions.
11 changes: 8 additions & 3 deletions src/VecSim/algorithms/hnsw/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,8 @@ idType HNSWIndex<DataType, DistType>::mutuallyConnectNewElement(
// lock.
if (new_node_level_data.numLinks == max_M_cur) {
// The new node cannot add more neighbors
this->log("Couldn't add all chosen neighbors upon inserting a new node");
this->log(VecSimCommonStrings::LOG_DEBUG_STRING,
"Couldn't add all chosen neighbors upon inserting a new node");
unlockNodeLinks(new_node_level);
unlockNodeLinks(neighbor_graph_data);
break;
Expand Down Expand Up @@ -1350,6 +1351,8 @@ template <typename DataType, typename DistType>
void HNSWIndex<DataType, DistType>::resizeIndexCommon(size_t new_max_elements) {
assert(new_max_elements % this->blockSize == 0 &&
"new_max_elements must be a multiple of blockSize");
this->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
"Updating HNSW index capacity from %zu to %zu", this->maxElements, new_max_elements);
resizeLabelLookup(new_max_elements);
visitedNodesHandlerPool.resize(new_max_elements);
idToMetaData.resize(new_max_elements);
Expand Down Expand Up @@ -1445,7 +1448,8 @@ void HNSWIndex<DataType, DistType>::mutuallyUpdateForRepairedNode(
for (auto chosen_id : chosen_neighbors) {
if (node_neighbors_idx == max_M_cur) {
// Cannot add more new neighbors, we reached the capacity.
this->log("Couldn't add all the chosen new nodes upon updating %u, as we reached the"
this->log(VecSimCommonStrings::LOG_DEBUG_STRING,
"Couldn't add all the chosen new nodes upon updating %u, as we reached the"
" maximum number of neighbors per node",
node_id);
break;
Expand Down Expand Up @@ -1803,7 +1807,8 @@ AddVectorCtx HNSWIndex<DataType, DistType>::storeNewElement(labelType label,
try {
new (cur_egd) ElementGraphData(state.elementMaxLevel, levelDataSize, this->allocator);
} catch (std::runtime_error &e) {
this->log("Error - allocating memory for new element failed due to low memory");
this->log(VecSimCommonStrings::LOG_WARNING_STRING,
"Error - allocating memory for new element failed due to low memory");
throw e;
}

Expand Down
3 changes: 2 additions & 1 deletion src/VecSim/algorithms/hnsw/hnsw_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ void HNSWIndex<DataType, DistType>::restoreGraph(std::ifstream &input) {
try {
new (tmpData) ElementGraphData(toplevel, this->levelDataSize, this->allocator);
} catch (std::runtime_error &e) {
this->log("Error - allocating memory for new element failed due to low memory");
this->log(VecSimCommonStrings::LOG_WARNING_STRING,
"Error - allocating memory for new element failed due to low memory");
throw e;
}
// Add the current element to the current block, and update cur_egt to point to it.
Expand Down
7 changes: 7 additions & 0 deletions src/VecSim/algorithms/hnsw/hnsw_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
}
void runGC() override {
// Run no more than pendingSwapJobsThreshold value jobs.
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
"running asynchronous GC for tiered HNSW index");
this->executeReadySwapJobs(this->pendingSwapJobsThreshold);
}
#ifdef BUILD_TESTS
Expand Down Expand Up @@ -286,6 +288,9 @@ void TieredHNSWIndex<DataType, DistType>::executeReadySwapJobs(size_t maxJobsToR

// Execute swap jobs - acquire hnsw write lock.
this->mainIndexGuard.lock();
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
"Tiered HNSW index GC: there are %zu ready swap jobs. Start executing %zu swap jobs",
readySwapJobs, MIN(readySwapJobs, maxJobsToRun));

vecsim_stl::vector<idType> idsToRemove(this->allocator);
idsToRemove.reserve(idToSwapJob.size());
Expand All @@ -304,6 +309,8 @@ void TieredHNSWIndex<DataType, DistType>::executeReadySwapJobs(size_t maxJobsToR
idToSwapJob.erase(id);
}
readySwapJobs -= idsToRemove.size();
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
"Tiered HNSW index GC: done executing %zu swap jobs", idsToRemove.size());
this->mainIndexGuard.unlock();
}

Expand Down
6 changes: 6 additions & 0 deletions src/VecSim/utils/vec_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ const char *VecSimCommonStrings::BACKEND_INDEX_STRING = "BACKEND_INDEX";
const char *VecSimCommonStrings::TIERED_HNSW_SWAP_JOBS_THRESHOLD_STRING =
"TIERED_HNSW_SWAP_JOBS_THRESHOLD";

// Log levels
const char *VecSimCommonStrings::LOG_DEBUG_STRING = "debug";
const char *VecSimCommonStrings::LOG_VERBOSE_STRING = "verbose";
const char *VecSimCommonStrings::LOG_NOTICE_STRING = "notice";
const char *VecSimCommonStrings::LOG_WARNING_STRING = "warning";

void sort_results_by_id(VecSimQueryResult_List rl) {
qsort(rl.results, VecSimQueryResult_Len(rl), sizeof(VecSimQueryResult),
(__compar_fn_t)cmpVecSimQueryResultById);
Expand Down
6 changes: 6 additions & 0 deletions src/VecSim/utils/vec_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ struct VecSimCommonStrings {
static const char *FRONTEND_INDEX_STRING;
static const char *BACKEND_INDEX_STRING;
static const char *TIERED_HNSW_SWAP_JOBS_THRESHOLD_STRING;

// Log levels
static const char *LOG_DEBUG_STRING;
static const char *LOG_VERBOSE_STRING;
static const char *LOG_NOTICE_STRING;
static const char *LOG_WARNING_STRING;
};

inline int cmpVecSimQueryResultById(const VecSimQueryResult *res1, const VecSimQueryResult *res2) {
Expand Down
3 changes: 2 additions & 1 deletion src/VecSim/vec_sim_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ typedef int (*timeoutCallbackFunction)(void *ctx);
/**
* @brief A struct to pass 3rd party logging function to Vecsimlib.
* @param ctx some generic context to pass to the function
* @param level loglevel (in redis we should choose from: "warning", "notice", "verbose", "debug")
* @param message the message to log
*/
typedef void (*logCallbackFunction)(void *ctx, const char *message);
typedef void (*logCallbackFunction)(void *ctx, const char *level, const char *message);

typedef enum {
VecSim_QueryResult_OK = VecSim_OK,
Expand Down
4 changes: 2 additions & 2 deletions src/VecSim/vec_sim_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
return results;
}

void log(const char *fmt, ...) const {
void log(const char *level, const char *fmt, ...) const {
if (VecSimIndexInterface::logCallback) {
// Format the message and call the callback
va_list args;
Expand All @@ -128,7 +128,7 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
va_start(args, fmt);
vsnprintf(buf, len + 1, fmt, args);
va_end(args);
logCallback(this->logCallbackCtx, buf);
logCallback(this->logCallbackCtx, level, buf);
delete[] buf;
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/VecSim/vec_sim_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <iostream>

// Print log messages to stdout
void Vecsim_Log(void *ctx, const char *message) { std::cout << message << std::endl; }
void Vecsim_Log(void *ctx, const char *level, const char *message) {
std::cout << level << ": " << message << std::endl;
}

timeoutCallbackFunction VecSimIndexInterface::timeoutCallback = [](void *ctx) { return 0; };
logCallbackFunction VecSimIndexInterface::logCallback = Vecsim_Log;
Expand Down
6 changes: 4 additions & 2 deletions src/VecSim/vec_sim_tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <shared_mutex>

#define TIERED_LOG this->backendIndex->log

/**
* Definition of generic job structure for asynchronous tiered index.
*/
Expand Down Expand Up @@ -255,9 +257,9 @@ VecSimTieredIndex<DataType, DistType>::rangeQuery(const void *queryBlob, double
template <typename DataType, typename DistType>
VecSimIndexInfo VecSimTieredIndex<DataType, DistType>::info() const {
VecSimIndexInfo info;
this->flatIndexGuard.lock();
this->flatIndexGuard.lock_shared();
VecSimIndexInfo frontendInfo = this->frontendIndex->info();
this->flatIndexGuard.unlock();
this->flatIndexGuard.unlock_shared();

this->mainIndexGuard.lock();
VecSimIndexInfo backendInfo = this->backendIndex->info();
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ add_executable(test_hnsw_parallel test_hnsw_parallel.cpp test_utils.cpp)
add_executable(test_bruteforce test_bruteforce.cpp test_bruteforce_multi.cpp test_utils.cpp)
add_executable(test_allocator test_allocator.cpp test_utils.cpp)
add_executable(test_spaces test_spaces.cpp)
add_executable(test_common test_common.cpp test_utils.cpp)
add_executable(test_common ../utils/mock_thread_pool.cpp test_utils.cpp test_common.cpp)

target_link_libraries(test_hnsw PUBLIC gtest_main VectorSimilarity)
target_link_libraries(test_hnsw_parallel PUBLIC gtest_main VectorSimilarity)
Expand Down
52 changes: 45 additions & 7 deletions tests/unit/test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "VecSim/utils/vecsim_results_container.h"
#include "VecSim/algorithms/hnsw/hnsw.h"
#include "VecSim/index_factories/hnsw_factory.h"
#include "mock_thread_pool.h"
#include "VecSim/index_factories/tiered_factory.h"

#include <cstdlib>
#include <limits>
Expand Down Expand Up @@ -446,13 +448,13 @@ struct logCtx {
std::string prefix;
};

void test_log_impl(void *ctx, const char *message) {
void test_log_impl(void *ctx, const char *level, const char *message) {
logCtx *log = (logCtx *)ctx;
std::string msg = log->prefix + message;
std::string msg = std::string(level) + ": " + log->prefix + message;
log->logBuffer.push_back(msg);
}

TEST(CommonAPITest, testlog) {
TEST(CommonAPITest, testlogBasic) {

logCtx log;
log.prefix = "test log prefix: ";
Expand All @@ -464,12 +466,48 @@ TEST(CommonAPITest, testlog) {
dynamic_cast<BruteForceIndex<float, float> *>(BruteForceFactory::NewIndex(&params));
VecSim_SetLogCallbackFunction(test_log_impl);

index->log("test log message no fmt");
index->log("test log message %s %s", "with", "args");
index->log(VecSimCommonStrings::LOG_NOTICE_STRING, "test log message no fmt");
index->log(VecSimCommonStrings::LOG_WARNING_STRING, "test log message %s %s", "with", "args");

ASSERT_EQ(log.logBuffer.size(), 2);
ASSERT_EQ(log.logBuffer[0], "test log prefix: test log message no fmt");
ASSERT_EQ(log.logBuffer[1], "test log prefix: test log message with args");
ASSERT_EQ(log.logBuffer[0], "notice: test log prefix: test log message no fmt");
ASSERT_EQ(log.logBuffer[1], "warning: test log prefix: test log message with args");

VecSimIndex_Free(index);
}

TEST(CommonAPITest, testlogTieredIndex) {
logCtx log;
log.prefix = "tiered prefix: ";
VecSim_SetLogCallbackFunction(test_log_impl);

HNSWParams params_raw = {.type = VecSimType_FLOAT32, .dim = 4, .metric = VecSimMetric_L2};
VecSimParams hnsw_params = {.algo = VecSimAlgo_HNSWLIB,
.algoParams = {.hnswParams = HNSWParams{params_raw}},
.logCtx = &log};
auto mock_thread_pool = tieredIndexMock();
TieredIndexParams tiered_params = {.jobQueue = &mock_thread_pool.jobQ,
.jobQueueCtx = mock_thread_pool.ctx,
.submitCb = tieredIndexMock::submit_callback,
.flatBufferLimit = DEFAULT_BLOCK_SIZE,
.primaryIndexParams = &hnsw_params,
.specificParams = {TieredHNSWParams{.swapJobThreshold = 1}}};

auto *tiered_index =
reinterpret_cast<TieredHNSWIndex<float, float> *>(TieredFactory::NewIndex(&tiered_params));
mock_thread_pool.ctx->index_strong_ref.reset(tiered_index);

GenerateAndAddVector<float>(tiered_index, 4, 1);
mock_thread_pool.thread_iteration();
tiered_index->deleteVector(1);
ASSERT_EQ(log.logBuffer.size(), 4);
ASSERT_EQ(log.logBuffer[0],
"verbose: " + log.prefix + "Updating HNSW index capacity from 0 to 1024");
ASSERT_EQ(log.logBuffer[1],
"verbose: " + log.prefix +
"Tiered HNSW index GC: there are 1 ready swap jobs. Start executing 1 swap jobs");
ASSERT_EQ(log.logBuffer[2],
"verbose: " + log.prefix + "Updating HNSW index capacity from 1024 to 0");
ASSERT_EQ(log.logBuffer[3],
"verbose: " + log.prefix + "Tiered HNSW index GC: done executing 1 swap jobs");
}
2 changes: 1 addition & 1 deletion tests/unit/test_hnsw_tiered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2889,7 +2889,7 @@ TYPED_TEST(HNSWTieredIndexTest, switchWriteModes) {
.metric = VecSimMetric_L2,
.multi = TypeParam::isMulti(),
.M = 32,
.efRuntime = 2 * n};
.efRuntime = 3 * n};
VecSimParams hnsw_params = CreateParams(params);
auto mock_thread_pool = tieredIndexMock();

Expand Down

0 comments on commit 3f6360b

Please sign in to comment.