diff --git a/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java b/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java index 393030e5c0..1118c2cba0 100644 --- a/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java +++ b/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java @@ -29,8 +29,8 @@ * stored to a consistent location of "{prefix}/{filename}". */ public class BlobStore { - private final String bucketName; - private final S3AsyncClient s3AsyncClient; + protected final String bucketName; + protected final S3AsyncClient s3AsyncClient; private final S3TransferManager transferManager; public BlobStore(S3AsyncClient s3AsyncClient, String bucketName) { diff --git a/astra/src/main/java/com/slack/astra/blobfs/S3IndexInput.java b/astra/src/main/java/com/slack/astra/blobfs/S3IndexInput.java new file mode 100644 index 0000000000..9e76edb373 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/blobfs/S3IndexInput.java @@ -0,0 +1,224 @@ +package com.slack.astra.blobfs; + +import static com.slack.astra.util.SizeConstant.MB; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; +import java.io.EOFException; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.lucene.store.IndexInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; + +/** + * Implementation of the Lucene IndexInput, supporting partial reading of files from S3. Instead of + * attempting to download an entire file to disk, this class will partially page in the bytes at the + * time of read, caching the results in memory. + */ +public class S3IndexInput extends IndexInput { + private static final Logger LOG = LoggerFactory.getLogger(S3IndexInput.class); + protected static final String PAGE_COUNTER = "astra_s3_index_input_pagein_counter"; + + public static final String ASTRA_S3_STREAMING_PAGESIZE = "astra.s3Streaming.pageSize"; + protected static final long PAGE_SIZE = + Long.parseLong(System.getProperty(ASTRA_S3_STREAMING_PAGESIZE, String.valueOf(2 * MB))); + + private final BlobStore blobStore; + private final S3AsyncClient s3AsyncClient; + private final String chunkId; + private final String objectName; + private final Map cachedData = new HashMap<>(); + + // pointer for next byte read within this input + private long filePointer = 0; + private Long fileLength; + + // variables if the input has been sliced + private final long sliceOffset; + private Long sliceLength = null; + + private S3IndexInput( + String resourceDescription, + BlobStore blobStore, + S3AsyncClient s3AsyncClient, + String chunkId, + String objectName, + Long sliceOffset, + Map cachedData, + Long length) { + super(resourceDescription); + this.blobStore = blobStore; + this.s3AsyncClient = s3AsyncClient; + this.chunkId = chunkId; + this.objectName = objectName; + + this.filePointer = 0; + this.sliceOffset = sliceOffset; + + this.cachedData.putAll(cachedData); + this.sliceLength = length; + } + + public S3IndexInput( + BlobStore blobStore, String resourceDescription, String chunkId, String objectName) { + super(resourceDescription); + this.blobStore = blobStore; + this.s3AsyncClient = blobStore.s3AsyncClient; + this.chunkId = chunkId; + this.objectName = objectName; + this.sliceOffset = 0; + } + + /** + * Reads data from a cached variable, else pages in data for the given page offset. The offset is + * a multiple of the page size, where pageKey 0 would be from bytes {0} - {pageSize}, pageKey 1 + * would be bytes {pageSize} to {2*pageSize}, etc. + * + * @param pageKey the offset to download + */ + private byte[] getData(long pageKey) throws ExecutionException, InterruptedException { + if (cachedData.containsKey(pageKey)) { + return cachedData.get(pageKey); + } else { + Metrics.counter(PAGE_COUNTER, List.of(Tag.of("chunkId", chunkId))).increment(); + cachedData.clear(); + + long readFrom = pageKey * PAGE_SIZE; + long readTo = Math.min((pageKey + 1) * PAGE_SIZE, Long.MAX_VALUE); + + Stopwatch timeDownload = Stopwatch.createStarted(); + byte[] response = + s3AsyncClient + .getObject( + GetObjectRequest.builder() + .bucket(blobStore.bucketName) + .key(String.format("%s/%s", chunkId, objectName)) + .range(String.format("bytes=%s-%s", readFrom, readTo)) + .build(), + AsyncResponseTransformer.toBytes()) + .get() + .asByteArray(); + + LOG.debug( + "Downloaded {} - byte length {} in {} ms for chunk {}", + objectName, + response.length, + timeDownload.elapsed(TimeUnit.MILLISECONDS), + chunkId); + cachedData.put(pageKey, response); + return response; + } + } + + @Override + public void close() throws IOException { + // nothing to close/cleanup + } + + /** Returns the current position in this file, where the next read will occur. */ + @Override + public long getFilePointer() { + return filePointer; + } + + /** + * Sets current position in this file, where the next read will occur. If this is beyond the end + * of the file then this will throw EOFException and then the stream is in an undetermined state. + */ + @Override + public void seek(long pos) throws IOException { + if (pos > length()) { + throw new EOFException(); + } + filePointer = pos; + } + + /** The number of bytes in the file. This value is cached to the fileLength variable. */ + @Override + public long length() { + if (sliceLength != null) { + return sliceLength; + } + + if (fileLength == null) { + try { + fileLength = + s3AsyncClient + .headObject( + HeadObjectRequest.builder() + .bucket(blobStore.bucketName) + .key(String.format("%s/%s", chunkId, objectName)) + .build()) + .get() + .contentLength(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error reading length", e); + throw new RuntimeException(e); + } + } + return fileLength; + } + + /** + * Creates a slice of this index input, with the given description, offset, and length. The slice + * is sought to the beginning. + */ + @Override + public IndexInput slice(String sliceDescription, long offset, long length) { + LOG.debug( + "Slicing {} for chunk ID {}, offset {} length {}", objectName, chunkId, offset, length); + return new S3IndexInput( + sliceDescription, + blobStore, + s3AsyncClient, + chunkId, + objectName, + offset, + cachedData, + length); + } + + /** Reads and returns a single byte, paging in data if required. */ + @Override + public byte readByte() { + try { + long getCacheKey = Math.floorDiv(filePointer + sliceOffset, PAGE_SIZE); + int byteArrayPos = Math.toIntExact(filePointer + sliceOffset - (getCacheKey * PAGE_SIZE)); + filePointer++; + return getData(getCacheKey)[byteArrayPos]; + } catch (ExecutionException | InterruptedException e) { + LOG.error("Error reading byte", e); + throw new RuntimeException(e); + } + } + + /** + * Reads a specified number of bytes into an array at the specified offset. + * + * @param b the array to read bytes into + * @param offset the offset in the array to start storing bytes + * @param len the number of bytes to read + */ + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + for (int i = 0; i < len; i++) { + b[offset + i] = readByte(); + } + } + + @VisibleForTesting + protected Map getCachedData() { + return new HashMap<>(cachedData); + } +} diff --git a/astra/src/main/java/com/slack/astra/blobfs/S3RemoteDirectory.java b/astra/src/main/java/com/slack/astra/blobfs/S3RemoteDirectory.java new file mode 100644 index 0000000000..e014da07d0 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/blobfs/S3RemoteDirectory.java @@ -0,0 +1,104 @@ +package com.slack.astra.blobfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Minimal implementation of a Lucene Directory, supporting only the functions that are required to + * support cache nodes reading data directly from S3. + */ +public class S3RemoteDirectory extends Directory { + private static final Logger LOG = LoggerFactory.getLogger(S3RemoteDirectory.class); + + private final BlobStore blobStore; + private final String chunkId; + + private List files = null; + + public S3RemoteDirectory(String chunkId, BlobStore blobStore) { + this.chunkId = chunkId; + this.blobStore = blobStore; + } + + @Override + public String[] listAll() { + if (files == null) { + files = + blobStore.listFiles(chunkId).stream() + .map( + fullPath -> { + String[] parts = fullPath.split("/"); + return parts[parts.length - 1]; + }) + .toList(); + LOG.debug( + "listed files for chunkId - {}, listResults - {}", chunkId, String.join(",", files)); + } + return files.toArray(String[]::new); + } + + @Override + public void deleteFile(String name) { + throw new NotImplementedException(); + } + + @Override + public long fileLength(String name) { + throw new NotImplementedException(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) { + throw new NotImplementedException(); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new NotImplementedException(); + } + + @Override + public void sync(Collection names) { + throw new NotImplementedException(); + } + + @Override + public void syncMetaData() { + throw new NotImplementedException(); + } + + @Override + public void rename(String source, String dest) { + throw new NotImplementedException(); + } + + @Override + public IndexInput openInput(String name, IOContext context) { + return new S3IndexInput(blobStore, name, chunkId, name); + } + + @Override + public Lock obtainLock(String name) { + throw new NotImplementedException(); + } + + @Override + public void close() throws IOException { + LOG.info("Closing s3 remote directory"); + } + + @Override + public Set getPendingDeletions() { + throw new NotImplementedException(); + } +} diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java b/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java index 0320c0dfca..80cf29b311 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java @@ -50,8 +50,9 @@ * BlobFs. */ public class ReadOnlyChunkImpl implements Chunk { - private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyChunkImpl.class); + public static final String ASTRA_S3_STREAMING_FLAG = "astra.s3Streaming.enabled"; + private ChunkInfo chunkInfo; private LogIndexSearcher logSearcher; private SearchMetadata searchMetadata; @@ -86,6 +87,9 @@ public class ReadOnlyChunkImpl implements Chunk { private final ReentrantLock chunkAssignmentLock = new ReentrantLock(); + private static final boolean USE_S3_STREAMING = + Boolean.parseBoolean(System.getProperty(ASTRA_S3_STREAMING_FLAG, "false")); + public ReadOnlyChunkImpl( AsyncCuratorFramework curatorFramework, MeterRegistry meterRegistry, @@ -219,38 +223,50 @@ public void downloadChunkData() { chunkAssignmentLock.lock(); try { CacheNodeAssignment assignment = getCacheNodeAssignment(); - // get data directory - dataDirectory = - Path.of(String.format("%s/astra-chunk-%s", dataDirectoryPrefix, assignment.assignmentId)); + this.chunkInfo = ChunkInfo.fromSnapshotMetadata(snapshotMetadata); - if (Files.isDirectory(dataDirectory)) { - try (Stream files = Files.list(dataDirectory)) { - if (files.findFirst().isPresent()) { - LOG.warn("Existing files found in slot directory, clearing directory"); - cleanDirectory(); + if (USE_S3_STREAMING) { + this.chunkSchema = ChunkSchema.deserializeBytes(blobStore.getSchema(chunkInfo.chunkId)); + this.logSearcher = + (LogIndexSearcher) + new LogIndexSearcherImpl( + LogIndexSearcherImpl.searcherManagerFromChunkId(chunkInfo.chunkId, blobStore), + chunkSchema.fieldDefMap); + } else { + // get data directory + dataDirectory = + Path.of( + String.format("%s/astra-chunk-%s", dataDirectoryPrefix, assignment.assignmentId)); + + if (Files.isDirectory(dataDirectory)) { + try (Stream files = Files.list(dataDirectory)) { + if (files.findFirst().isPresent()) { + LOG.warn("Existing files found in slot directory, clearing directory"); + cleanDirectory(); + } } } - } - blobStore.download(snapshotMetadata.snapshotId, dataDirectory); - try (Stream fileList = Files.list(dataDirectory)) { - if (fileList.findAny().isEmpty()) { - throw new IOException("No files found on blob storage, released slot for re-assignment"); + blobStore.download(snapshotMetadata.snapshotId, dataDirectory); + try (Stream fileList = Files.list(dataDirectory)) { + if (fileList.findAny().isEmpty()) { + throw new IOException( + "No files found on blob storage, released slot for re-assignment"); + } } - } - Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME); - if (!Files.exists(schemaPath)) { - throw new RuntimeException("We expect a schema.json file to exist within the index"); - } - this.chunkSchema = ChunkSchema.deserializeFile(schemaPath); + Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME); + if (!Files.exists(schemaPath)) { + throw new RuntimeException("We expect a schema.json file to exist within the index"); + } + this.chunkSchema = ChunkSchema.deserializeFile(schemaPath); - this.chunkInfo = ChunkInfo.fromSnapshotMetadata(snapshotMetadata); - this.logSearcher = - (LogIndexSearcher) - new LogIndexSearcherImpl( - LogIndexSearcherImpl.searcherManagerFromPath(dataDirectory), - chunkSchema.fieldDefMap); + this.logSearcher = + (LogIndexSearcher) + new LogIndexSearcherImpl( + LogIndexSearcherImpl.searcherManagerFromPath(dataDirectory), + chunkSchema.fieldDefMap); + } // set chunk state cacheNodeAssignmentStore.updateAssignmentState( @@ -262,11 +278,18 @@ public void downloadChunkData() { registerSearchMetadata(searchMetadataStore, searchContext, snapshotMetadata.name); long durationNanos = assignmentTimer.stop(chunkAssignmentTimerSuccess); - LOG.info( - "Downloaded chunk with snapshot id '{}' in {} seconds, was {}", - snapshotMetadata.snapshotId, - TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS), - FileUtils.byteCountToDisplaySize(FileUtils.sizeOfDirectory(dataDirectory.toFile()))); + if (USE_S3_STREAMING) { + LOG.info( + "Downloaded chunk with snapshot id '{}' in {} seconds, astra.cache.s3streaming is enabled", + snapshotMetadata.snapshotId, + TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS)); + } else { + LOG.info( + "Downloaded chunk with snapshot id '{}' in {} seconds, was {}", + snapshotMetadata.snapshotId, + TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS), + FileUtils.byteCountToDisplaySize(FileUtils.sizeOfDirectory(dataDirectory.toFile()))); + } } catch (Exception e) { // if any error occurs during the chunk assignment, try to release the slot for re-assignment, // disregarding any errors diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index 4d0630f757..030dee047e 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -61,7 +61,7 @@ public class CachingChunkManager extends ChunkManagerBase { private CacheNodeAssignmentStore cacheNodeAssignmentStore; private CacheNodeMetadataStore cacheNodeMetadataStore; - private ExecutorService executorService = + private final ExecutorService executorService = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("caching-chunk-manager-%d").build()); diff --git a/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java b/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java index 1c0d74f3fb..788c6f3201 100644 --- a/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java +++ b/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java @@ -7,6 +7,8 @@ import brave.Tracing; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.slack.astra.blobfs.BlobStore; +import com.slack.astra.blobfs.S3RemoteDirectory; import com.slack.astra.logstore.LogMessage; import com.slack.astra.logstore.LogMessage.SystemField; import com.slack.astra.logstore.LogWireMessage; @@ -35,6 +37,7 @@ import org.apache.lucene.search.SortField.Type; import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.aggregations.AggregatorFactories; @@ -63,6 +66,13 @@ public class LogIndexSearcherImpl implements LogIndexSearcher { System.getProperty("astra.query.useOpenSearchAggregationParsing", "false")); ; + @VisibleForTesting + public static SearcherManager searcherManagerFromChunkId(String chunkId, BlobStore blobStore) + throws IOException { + Directory directory = new S3RemoteDirectory(chunkId, blobStore); + return new SearcherManager(directory, null); + } + @VisibleForTesting public static SearcherManager searcherManagerFromPath(Path path) throws IOException { MMapDirectory directory = new MMapDirectory(path); diff --git a/astra/src/main/java/com/slack/astra/util/SizeConstant.java b/astra/src/main/java/com/slack/astra/util/SizeConstant.java new file mode 100644 index 0000000000..2d6355a8a5 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/util/SizeConstant.java @@ -0,0 +1,31 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.slack.astra.util; + +/** Constants for common sizes */ +public final class SizeConstant { + + /** 1 Kibibyte */ + public static final long KB = 1024; + + /** 1 Mebibyte. */ + public static final long MB = 1024 * KB; + + /** 1 Gibibyte. */ + public static final long GB = 1024 * MB; + + private SizeConstant() {} +} diff --git a/astra/src/test/java/com/slack/astra/blobfs/S3IndexInputTest.java b/astra/src/test/java/com/slack/astra/blobfs/S3IndexInputTest.java new file mode 100644 index 0000000000..5dc3f2fdbc --- /dev/null +++ b/astra/src/test/java/com/slack/astra/blobfs/S3IndexInputTest.java @@ -0,0 +1,85 @@ +package com.slack.astra.blobfs; + +import static com.slack.astra.testlib.MetricsUtil.getCount; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; + +import com.adobe.testing.s3mock.junit5.S3MockExtension; +import io.micrometer.core.instrument.Metrics; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +class S3IndexInputTest { + private static final String TEST_BUCKET = "testBucket"; + + @RegisterExtension + public static final S3MockExtension S3_MOCK_EXTENSION = + S3MockExtension.builder() + .silent() + .withInitialBuckets(TEST_BUCKET) + .withSecureConnection(false) + .build(); + + private final S3AsyncClient s3Client = + S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); + + @Test + public void shouldPageInContents() throws IOException { + BlobStore blobStore = spy(new BlobStore(s3Client, TEST_BUCKET)); + String resourceDescription = "resource"; + String chunkId = UUID.randomUUID().toString(); + + // for test purposes we'll write two pages worth of data + // first page is all "1", second is all "2" + Path directoryUpload = Files.createTempDirectory(""); + Path tempFile = Files.createTempFile(directoryUpload, "example", ""); + byte[] testData = new byte[2 * Math.toIntExact(S3IndexInput.PAGE_SIZE)]; + for (int i = 0; i < S3IndexInput.PAGE_SIZE; i++) { + testData[i] = 1; + } + for (int i = 0; i < S3IndexInput.PAGE_SIZE; i++) { + testData[Math.toIntExact(i + S3IndexInput.PAGE_SIZE)] = 2; + } + Files.write(tempFile, testData); + blobStore.upload(chunkId, directoryUpload); + + try (S3IndexInput s3IndexInput = + new S3IndexInput( + blobStore, resourceDescription, chunkId, tempFile.getFileName().toString())) { + assertThat(s3IndexInput.getCachedData().size()).isZero(); + + // read in a single byte, and ensure that it paged in a single page worth of contents + byte readByte = s3IndexInput.readByte(); + assertThat(readByte).isEqualTo((byte) 1); + assertThat(s3IndexInput.getFilePointer()).isEqualTo(1); + await() + .until( + () -> getCount(S3IndexInput.PAGE_COUNTER, Metrics.globalRegistry), + (counter) -> counter == 1); + + // bulk read in the remainder of the page, ensure that is still is using the cached data + byte[] bulkRead = new byte[Math.toIntExact(S3IndexInput.PAGE_SIZE - 1)]; + s3IndexInput.readBytes(bulkRead, 0, Math.toIntExact(S3IndexInput.PAGE_SIZE - 1)); + assertThat(getCount(S3IndexInput.PAGE_COUNTER, Metrics.globalRegistry)).isEqualTo(1); + assertThat(bulkRead[bulkRead.length - 1]).isEqualTo((byte) 1); + assertTrue(s3IndexInput.getCachedData().containsKey(0L)); + + // read in a single byte more, which will trigger another page load + // ensure that the page was loaded as expected + byte readByteNextPage = s3IndexInput.readByte(); + assertThat(readByteNextPage).isEqualTo((byte) 2); + assertTrue(s3IndexInput.getCachedData().containsKey(1L)); + await() + .until( + () -> getCount(S3IndexInput.PAGE_COUNTER, Metrics.globalRegistry), + (counter) -> counter == 2); + } + } +} diff --git a/astra/src/test/java/com/slack/astra/blobfs/S3RemoteDirectoryTest.java b/astra/src/test/java/com/slack/astra/blobfs/S3RemoteDirectoryTest.java new file mode 100644 index 0000000000..611342b4cc --- /dev/null +++ b/astra/src/test/java/com/slack/astra/blobfs/S3RemoteDirectoryTest.java @@ -0,0 +1,31 @@ +package com.slack.astra.blobfs; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +class S3RemoteDirectoryTest { + + @Test + public void shouldListFileNamesOnly() throws IOException { + String chunkId = "chunkId"; + String bucketName = "bucketName"; + S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + BlobStore blobStore = spy(new BlobStore(s3AsyncClient, bucketName)); + + doReturn(List.of("foo/bar.example")).when(blobStore).listFiles(any()); + + try (S3RemoteDirectory s3RemoteDirectory = new S3RemoteDirectory(chunkId, blobStore)) { + String[] filesArray = s3RemoteDirectory.listAll(); + assertThat(filesArray.length).isEqualTo(1); + assertThat(filesArray[0]).isEqualTo("bar.example"); + } + } +}