From b5cd3f5d2a5d04bfca8e96e3563bde2b90ee812b Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 23 Jul 2024 08:42:47 -0700 Subject: [PATCH] Add perms for remote snapshot cache eviction on scripted query (#14411) (#14885) (cherry picked from commit 90d5500ecbf13b08d2f6a9fa6ad67119acd37a17) Signed-off-by: Finn Carroll Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- CHANGELOG.md | 1 + .../store/remote/utils/TransferManager.java | 74 +++++++++---------- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1ba93a0fafdf..68d13c2c18f0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches the index template ([#12891](https://github.com/opensearch-project/OpenSearch/pull/12891)) - Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385)) - Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754)) +- Fix searchable snapshot failure with scripted fields ([#14411](https://github.com/opensearch-project/OpenSearch/pull/14411)) ### Security diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index df26f2f0925f6..f07c4832d982c 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -64,16 +64,22 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio final Path key = blobFetchRequest.getFilePath(); logger.trace("fetchBlob called for {}", key.toString()); - final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { - if (cachedIndexInput == null || cachedIndexInput.isClosed()) { - logger.trace("Transfer Manager - IndexInput closed or not in cache"); - // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); - } else { - logger.trace("Transfer Manager - Already in cache"); - // already in the cache and ready to be used (open) - return cachedIndexInput; - } + // We need to do a privileged action here in order to fetch from remote + // and write/evict from local file cache in case this is invoked as a side + // effect of a plugin (such as a scripted search) that doesn't have the + // necessary permissions. + final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction) () -> { + return fileCache.compute(key, (path, cachedIndexInput) -> { + if (cachedIndexInput == null || cachedIndexInput.isClosed()) { + logger.trace("Transfer Manager - IndexInput closed or not in cache"); + // Doesn't exist or is closed, either way create a new one + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); + } else { + logger.trace("Transfer Manager - Already in cache"); + // already in the cache and ready to be used (open) + return cachedIndexInput; + } + }); }); // Cache entry was either retrieved from the cache or newly added, either @@ -88,37 +94,31 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio @SuppressWarnings("removal") private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { - // We need to do a privileged action here in order to fetch from remote - // and write to the local file cache in case this is invoked as a side - // effect of a plugin (such as a scripted search) that doesn't have the - // necessary permissions. - return AccessController.doPrivileged((PrivilegedAction) () -> { - try { - if (Files.exists(request.getFilePath()) == false) { - logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); - try ( - OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); - OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) - ) { - for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { - try ( - InputStream snapshotFileInputStream = streamReader.read( - blobPart.getBlobName(), - blobPart.getPosition(), - blobPart.getLength() - ); - ) { - snapshotFileInputStream.transferTo(localFileOutputStream); - } + try { + if (Files.exists(request.getFilePath()) == false) { + logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); + try ( + OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); + OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) + ) { + for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { + try ( + InputStream snapshotFileInputStream = streamReader.read( + blobPart.getBlobName(), + blobPart.getPosition(), + blobPart.getLength() + ); + ) { + snapshotFileInputStream.transferTo(localFileOutputStream); } } } - final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); - return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput); - } catch (IOException e) { - throw new UncheckedIOException(e); } - }); + final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); + return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } /**