From 8944e416eb215383483f5623394dbcb3186038c9 Mon Sep 17 00:00:00 2001 From: Huisheng Liu Date: Sun, 25 Feb 2024 13:37:42 -0800 Subject: [PATCH] add wait() method to AlignedFileReader --- include/aligned_file_reader.h | 5 +++++ src/pq_flash_index.cpp | 37 +++++------------------------------ 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/include/aligned_file_reader.h b/include/aligned_file_reader.h index f5e2af5c3..f39d5da39 100644 --- a/include/aligned_file_reader.h +++ b/include/aligned_file_reader.h @@ -117,4 +117,9 @@ class AlignedFileReader // process batch of aligned requests in parallel // NOTE :: blocking call virtual void read(std::vector &read_reqs, IOContext &ctx, bool async = false) = 0; + +#ifdef USE_BING_INFRA + // wait for completion of one request in a batch of requests + virtual void wait(IOContext &ctx, int &completedIndex) = 0; +#endif }; diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index 33867d4be..3ec70b163 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -1140,37 +1140,10 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons } #ifdef USE_BING_INFRA -bool getNextCompletedRequest(const IOContext &ctx, size_t size, int &completedIndex) -{ - bool waitsRemaining = false; - long completeCount = ctx.m_completeCount; - do - { - for (int i = 0; i < size; i++) - { - auto ithStatus = (*ctx.m_pRequestsStatus)[i]; - if (ithStatus == IOContext::Status::READ_SUCCESS) - { - completedIndex = i; - return true; - } - else if (ithStatus == IOContext::Status::READ_WAIT) - { - waitsRemaining = true; - } - } - - // if we didn't find one in READ_SUCCESS, wait for one to complete. - if (waitsRemaining) - { - WaitOnAddress(&ctx.m_completeCount, &completeCount, sizeof(completeCount), 100); - // this assumes the knowledge of the reader behavior (implicit - // contract). need better factoring? - } - } while (waitsRemaining); - - completedIndex = -1; - return false; +bool getNextCompletedRequest(std::shared_ptr &reader, + IOContext &ctx, int &completedIndex) { + reader->wait(ctx, completedIndex); + return completedIndex != -1; } #endif @@ -1476,7 +1449,7 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t long requestCount = static_cast(frontier_read_reqs.size()); // If we issued read requests and if a read is complete or there are // reads in wait state, then enter the while loop. - while (requestCount > 0 && getNextCompletedRequest(ctx, requestCount, completedIndex)) + while (requestCount > 0 && getNextCompletedRequest(reader, ctx, completedIndex)) { assert(completedIndex >= 0); auto &frontier_nhood = frontier_nhoods[completedIndex];