Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Vector Index Build] Add vector data upload implementation to RemoteIndexBuildStrategy #2550

Merged
merged 1 commit into from
Feb 26, 2025

Conversation

jed326
Copy link
Contributor

@jed326 jed326 commented Feb 21, 2025

Description

This PR includes implementation + testing for upload vector and doc id blobs to a remote repository with the RemoteIndexBuildStrategy. This PR also includes a new knn.remote_index_build.size_threshold setting to control the size threshold above which the remote index build service will be used.


Vector Upload Details

For a detailed explanation on the vector uploads, please see the Parallel Blob Upload section from the LLD: #2465

At a high level the flow is as follows (Currently only repository-s3 supports parallel blob uploads via this interface, so I will reference S3 directly in the following):

  1. KNN Plugin provides an InputStream supplier to asyncBlobUpload (See: RemoteIndexBuildStrategy#getTransferPartStreamSupplier)
  2. Repository S3 determines the number of parts (N) to split the blob into based on repository settings. By default we use 16mb parts.
  3. Repository uses the supplier from [1] to create N VectorValueInputStreams each representing a only a specific part of the underlying KNNVectorValues.
  4. Repository S3 performs a multi-part upload request which each stream as an upload part. The number of parallel parts depends on the upload priority and threadpool configurations through the repository settings.

Because KNNVectorValues is an iterator, we need to create N instances of it, otherwise the iterating through all of the vectors is still done sequentially which will greatly reduce the throughput. VectorValueInputStreams takes in a constructor parameter to position the head of the input stream at a specific byte offset in the KNNVectorValues, including offsets within a given vector. For example, for doc id X the vector could be partially read by InputStream part 1 and partially in InputStream part 2.


This PR DOES NOT include:

For reference, this is the blob path being used:

<BASE_PATH>/<Index UUID>"_vectors"/<uuid>"_"<field name>"_"<segment name".knnvec/knndid

Related Issues

Relates #2465
Relates #2392
Relates #2391

Check List

  • New functionality includes testing.
    - [ ] New functionality has been documented.
    - [ ] API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
    - [ ] Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@jed326
Copy link
Contributor Author

jed326 commented Feb 21, 2025

@jmazanec15 @shatejas @Vikasht34 @navneet1v could you please help review? thanks!

Copy link
Collaborator

@navneet1v navneet1v left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jed326 can you please add the flow of functions used by the Repository service for InputStream. Since there are some override functions its hard to follow which function is called when.

It will also be good if you can add a packge-info.java explaining the flow and usage of the InputStream in more details.

}

bytesRemaining--;
return currentBuffer.get() & 0xFF;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need a & here?

Copy link
Contributor Author

@jed326 jed326 Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We technically don't (added a code comment explaining), however I think it's safer to keep it here. IntelliJ itself also gives you a warning if you don't do the conversion and all the base InputStream implementations do the same.

I also added some tests to verify that read() and read(byte[] b, int offset, int length) read the same data over a given stream.

@navneet1v
Copy link
Collaborator

@jed326 lets avoid using skip-change log tag and update changelogs

@jed326 jed326 force-pushed the remote-vector-dev branch 2 times, most recently from b3c4c85 to af72c3b Compare February 24, 2025 19:04
@jed326 jed326 changed the title Add vector data upload implementation to RemoteIndexBuildStrategy [Remote Vector Index Build] Add vector data upload implementation to RemoteIndexBuildStrategy Feb 24, 2025
@jed326 jed326 force-pushed the remote-vector-dev branch 2 times, most recently from a8063b5 to 8f996e3 Compare February 25, 2025 04:18
Copy link
Member

@jmazanec15 jmazanec15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall - looks good to me. A few minor comments, but nothing blocking

StopWatch stopWatch;
long time_in_millis;
try {
stopWatch = new StopWatch().start();
// We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes this time based?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jmazanec15
jmazanec15 previously approved these changes Feb 25, 2025
Copy link
Collaborator

@shatejas shatejas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. A few things related to maintenance and UTs


assertTrue(asyncUpload.get());
assertTrue(upload.get());
verify(mockBlobStore).blobContainer(any());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you already have a testBasePath, can you replace any() with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately BlobPath path = getRepository().basePath().add(indexSettings.getUUID() + VECTORS_PATH); creates a new BlobPath instances instead of modifying the existing one, so I would need to mock BlobPath itself as well as set up all the mocks for IndexSettings. Since we're using our own custom blobContainer implementation here I don't think it's necessary for this test.

Copy link
Collaborator

@navneet1v navneet1v left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall code looks good to me. Please fix @shatejas comments specially related to creating repo/dataacess class .

@jed326
Copy link
Contributor Author

jed326 commented Feb 26, 2025

Thanks @navneet1v @shatejas @jmazanec15, I've added a new VectorRepositoryAccessor interface with a default implementation of DefaultVectorRepositoryAccessor to handle all of the KNNVectorValues -> InputStream conversions. Please take another look whenever you get a chance, thanks!

.expectedChecksum(null)
.build();

AtomicReference<Exception> exception = new AtomicReference<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are dealing with call backs and async flow , Better option to use is CompletableFuture than AtomicReference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this some more, I think the existing implementation is still preferable to CompletableFuture. The reason is asyncBlobUpload does not throw an exception on upload failures, instead it propagates it to whatever ActionListener is passed into it.

If we want to do this with CompletableFuture without an AtomicReference then we would need to throw the exception within the ActionListener, which is not correct behavior as the listener is not intended to interrupt execution flows.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it should work.

CompletableFuture<Void> vectorUploadFuture;
    
    if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
        log.info("Using parallel upload for repository: {}", repository);
        vectorUploadFuture = CompletableFuture.runAsync(() -> {
            try {
                uploadVectorsAsync(blobContainer, blobName, vectorBlobLength, vectorDataType, knnVectorValuesSupplier);
            } catch (IOException e) {
                throw new RuntimeException("Vector upload failed", e);
            }
        });
    } else {
        log.info("Using sequential upload for repository: {}", repository);
        vectorUploadFuture = CompletableFuture.runAsync(() -> {
            try {
                uploadVectorsSequentially(blobContainer, blobName, vectorBlobLength, vectorDataType, knnVectorValues);
            } catch (IOException e) {
                throw new RuntimeException("Sequential vector upload failed", e);
            }
        });
    }

    CompletableFuture<Void> docIdUploadFuture = CompletableFuture.runAsync(() -> {
        try {
            writeDocIds(knnVectorValues, totalLiveDocs, blobName, blobContainer);
        } catch (IOException e) {
            throw new RuntimeException("Doc ID upload failed", e);
        }
    });

    // Wait for both tasks to complete
    CompletableFuture.allOf(vectorUploadFuture, docIdUploadFuture).join();


@Override
public int read(byte[] b, int off, int len) throws IOException {
if (currentBuffer == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit :- Can we have somthing boolean instread of making CurrentBuffer null , like end of stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against using a boolean, but I think it's important to make currentBuffer null to enforce we do not read extra bytes out of the buffer. Then an NPE here would mean a fatal/unrecoverable error in the remote upload process so we want to fail and use the fallbackStrategy. Without setting currentBuffer to null I think we're at risk of reading extra bytes.

Copy link
Collaborator

@shatejas shatejas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good no major concerns as such

Copy link
Collaborator

@Vikasht34 Vikasht34 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jmazanec15 jmazanec15 merged commit 5873add into opensearch-project:main Feb 26, 2025
35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants