From bc0f66aa77a4c54309b8e15f3e33076f2d3dedc0 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Mon, 24 Feb 2025 14:36:59 -0800 Subject: [PATCH] Add download + indexOuput#write implementation to RemoteIndexBuildStrategy Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../remote/RemoteIndexBuildStrategy.java | 23 ++++-- .../index/store/IndexOutputWithBuffer.java | 46 ++++++++++- .../remote/RemoteIndexBuildStrategyTests.java | 80 ++++++++++++++++++- 4 files changed, 142 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70ec3ae510..84b62b4624 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD) ### Features +* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554) ### Enhancements ### Bug Fixes ### Infrastructure diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java index 8555e2ad68..3a7b641335 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java @@ -5,16 +5,20 @@ package org.opensearch.knn.index.codec.nativeindex.remote; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang.NotImplementedException; import org.apache.lucene.index.SegmentWriteState; import org.opensearch.common.StopWatch; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.index.IndexSettings; import org.opensearch.knn.common.featureflags.KNNFeatureFlags; import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy; import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams; +import org.opensearch.knn.index.store.IndexOutputWithBuffer; import org.opensearch.knn.index.vectorvalues.KNNVectorValues; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -22,6 +26,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.io.InputStream; import java.util.function.Supplier; import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING; @@ -37,6 +42,7 @@ public class RemoteIndexBuildStrategy implements NativeIndexBuildStrategy { private final Supplier repositoriesServiceSupplier; private final NativeIndexBuildStrategy fallbackStrategy; + private static final String VECTOR_BLOB_FILE_EXTENSION = ".knnvec"; private static final String DOC_ID_FILE_EXTENSION = ".knndid"; @@ -93,12 +99,12 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException { log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); stopWatch = new StopWatch().start(); - awaitVectorBuild(); + BlobPath downloadPath = awaitVectorBuild(); time_in_millis = stopWatch.stop().totalTime().millis(); log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); stopWatch = new StopWatch().start(); - readFromRepository(); + readFromRepository(downloadPath, indexInfo.getIndexOutputWithBuffer()); time_in_millis = stopWatch.stop().totalTime().millis(); log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); } catch (Exception e) { @@ -155,15 +161,22 @@ private void submitVectorBuild() { /** * Wait on remote vector build to complete + * @return BlobPath The path from which we should perform download */ - private void awaitVectorBuild() { + private BlobPath awaitVectorBuild() throws NotImplementedException { throw new NotImplementedException(); } /** * Read constructed vector file from remote repository and write to IndexOutput */ - private void readFromRepository() { - throw new NotImplementedException(); + @VisibleForTesting + void readFromRepository(BlobPath downloadPath, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException { + BlobContainer blobContainer = getRepository().blobStore().blobContainer(downloadPath.parent()); + // TODO: We are using the sequential download API as multi-part parallel download is difficult for us to implement today and + // requires some changes in core. For more details, see: https://github.com/opensearch-project/k-NN/issues/2464 + String fileName = downloadPath.toArray()[downloadPath.toArray().length - 1]; + InputStream graphStream = blobContainer.readBlob(fileName); + indexOutputWithBuffer.writeFromStreamWithBuffer(graphStream); } } diff --git a/src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java b/src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java index c1420238ae..b9f98f6490 100644 --- a/src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java +++ b/src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java @@ -8,7 +8,12 @@ import org.apache.lucene.store.IndexOutput; import java.io.IOException; +import java.io.InputStream; +/** + * Wrapper around {@link IndexOutput} to perform writes in a buffered manner. This class is created per flush/merge, and may be used twice if + * {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} needs to fall back to a different build strategy. + */ public class IndexOutputWithBuffer { // Underlying `IndexOutput` obtained from Lucene's Directory. private IndexOutput indexOutput; @@ -16,10 +21,12 @@ public class IndexOutputWithBuffer { // Allocating 64KB here since it show better performance in NMSLIB with the size. (We had slightly improvement in FAISS than having 4KB) // NMSLIB writes an adjacent list size first, then followed by serializing the list. Since we usually have more adjacent lists, having // 64KB to accumulate bytes as possible to reduce the times of calling `writeBytes`. - private byte[] buffer = new byte[64 * 1024]; + static final int CHUNK_SIZE = 64 * 1024; + private final byte[] buffer; public IndexOutputWithBuffer(IndexOutput indexOutput) { this.indexOutput = indexOutput; + this.buffer = new byte[CHUNK_SIZE]; } // This method will be called in JNI layer which precisely knows @@ -33,6 +40,43 @@ public void writeBytes(int length) { } } + /** + * Writes to the {@link IndexOutput} by buffering bytes into the existing buffer in this class. + * + * @param inputStream The stream from which we are reading bytes to write + * @throws IOException + * @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream, byte[]) + */ + public void writeFromStreamWithBuffer(InputStream inputStream) throws IOException { + writeFromStreamWithBuffer(inputStream, this.buffer); + } + + /** + * Writes to the {@link IndexOutput} by buffering bytes with @param outputBuffer. This method allows + * {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} to provide a separate, larger buffer as that buffer is for buffering + * bytes downloaded from the repository, so it may be more performant to use a larger buffer. + * We do not change the size of the existing buffer in case a fallback to the existing build strategy is needed. + * TODO: Tune the size of the buffer used by RemoteIndexBuildStrategy based on benchmarking + * + * @param inputStream The stream from which we are reading bytes to write + * @param outputBuffer The buffer used to buffer bytes + * @throws IOException + * @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream) + */ + public void writeFromStreamWithBuffer(InputStream inputStream, byte[] outputBuffer) throws IOException { + int bytesRead = 0; + // InputStream uses -1 indicates there are no more bytes to be read + while (bytesRead != -1) { + // Try to read CHUNK_SIZE into the buffer. The actual amount read may be less. + bytesRead = inputStream.read(outputBuffer, 0, CHUNK_SIZE); + assert bytesRead <= CHUNK_SIZE; + // However many bytes we read, write it to the IndexOutput if != -1 + if (bytesRead != -1) { + indexOutput.writeBytes(outputBuffer, 0, bytesRead); + } + } + } + @Override public String toString() { return "{indexOutput=" + indexOutput + ", len(buffer)=" + buffer.length + "}"; diff --git a/src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategyTests.java b/src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategyTests.java index 1589021f67..17c8201cf3 100644 --- a/src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategyTests.java +++ b/src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategyTests.java @@ -5,7 +5,18 @@ package org.opensearch.knn.index.codec.nativeindex.remote; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.junit.Before; import org.mockito.Mockito; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.knn.KNNTestCase; +import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy; import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams; @@ -16,17 +27,21 @@ import org.opensearch.knn.index.vectorvalues.TestVectorValues; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; +import java.util.Random; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING; -public class RemoteIndexBuildStrategyTests extends OpenSearchTestCase { +public class RemoteIndexBuildStrategyTests extends KNNTestCase { static int fallbackCounter = 0; @@ -38,6 +53,16 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException { } } + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + ClusterSettings clusterSettings = mock(ClusterSettings.class); + when(clusterSettings.get(KNN_REMOTE_VECTOR_REPO_SETTING)).thenReturn("test-repo-name"); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + KNNSettings.state().setClusterService(clusterService); + } + public void testFallback() throws IOException { List vectorValues = List.of(new float[] { 1, 2 }, new float[] { 2, 3 }, new float[] { 3, 4 }); final TestVectorValues.PreDefinedFloatVectorValues randomVectorValues = new TestVectorValues.PreDefinedFloatVectorValues( @@ -64,4 +89,55 @@ public void testFallback() throws IOException { objectUnderTest.buildAndWriteIndex(buildIndexParams); assertEquals(1, fallbackCounter); } + + /** + * Verify the buffered read method in {@link RemoteIndexBuildStrategy#readFromRepository} produces the correct result + */ + public void testRepositoryRead() throws IOException { + // Create an InputStream with random values + int TEST_ARRAY_SIZE = 64 * 1024 * 10; + byte[] byteArray = new byte[TEST_ARRAY_SIZE]; + Random random = new Random(); + random.nextBytes(byteArray); + InputStream randomStream = new ByteArrayInputStream(byteArray); + + // Create a test segment that we will read/write from + Directory directory; + directory = newFSDirectory(createTempDir()); + String TEST_SEGMENT_NAME = "test-segment-name"; + IndexOutput testIndexOutput = directory.createOutput(TEST_SEGMENT_NAME, IOContext.DEFAULT); + IndexOutputWithBuffer testIndexOutputWithBuffer = new IndexOutputWithBuffer(testIndexOutput); + + // Set up RemoteIndexBuildStrategy and write to IndexOutput + RepositoriesService repositoriesService = mock(RepositoriesService.class); + BlobStoreRepository mockRepository = mock(BlobStoreRepository.class); + BlobPath testBasePath = new BlobPath().add("testBasePath"); + BlobStore mockBlobStore = mock(BlobStore.class); + AsyncMultiStreamBlobContainer mockBlobContainer = mock(AsyncMultiStreamBlobContainer.class); + + when(repositoriesService.repository(any())).thenReturn(mockRepository); + when(mockRepository.basePath()).thenReturn(testBasePath); + when(mockRepository.blobStore()).thenReturn(mockBlobStore); + when(mockBlobStore.blobContainer(any())).thenReturn(mockBlobContainer); + when(mockBlobContainer.readBlob("testFile")).thenReturn(randomStream); + + RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy( + () -> repositoriesService, + mock(NativeIndexBuildStrategy.class) + ); + // This should read from randomStream into testIndexOutput + BlobPath testPath = new BlobPath().add("testBasePath").add("testDirectory").add("testFile"); + objectUnderTest.readFromRepository(testPath, testIndexOutputWithBuffer); + testIndexOutput.close(); + + // Now try to read from the IndexOutput + IndexInput testIndexInput = directory.openInput(TEST_SEGMENT_NAME, IOContext.DEFAULT); + byte[] resultByteArray = new byte[TEST_ARRAY_SIZE]; + testIndexInput.readBytes(resultByteArray, 0, TEST_ARRAY_SIZE); + assertArrayEquals(byteArray, resultByteArray); + + // Test Cleanup + testIndexInput.close(); + directory.close(); + } }