Skip to content

Commit

Permalink
Add perms for remote snapshot cache eviction on scripted query (#14411)…
Browse files Browse the repository at this point in the history
… (#14885)

(cherry picked from commit 90d5500)

Signed-off-by: Finn Carroll <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent c770738 commit b5cd3f5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches the index template ([#12891](https://github.com/opensearch-project/OpenSearch/pull/12891))
- Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385))
- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754))
- Fix searchable snapshot failure with scripted fields ([#14411](https://github.com/opensearch-project/OpenSearch/pull/14411))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,22 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
final Path key = blobFetchRequest.getFilePath();
logger.trace("fetchBlob called for {}", key.toString());

final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
// We need to do a privileged action here in order to fetch from remote
// and write/evict from local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction<CachedIndexInput>) () -> {
return fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});
});

// Cache entry was either retrieved from the cache or newly added, either
Expand All @@ -88,37 +94,31 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio

@SuppressWarnings("removal")
private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) {
// We need to do a privileged action here in order to fetch from remote
// and write to the local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
return AccessController.doPrivileged((PrivilegedAction<FileCachedIndexInput>) () -> {
try {
if (Files.exists(request.getFilePath()) == false) {
logger.trace("Fetching from Remote in createIndexInput of Transfer Manager");
try (
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = streamReader.read(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
try {
if (Files.exists(request.getFilePath()) == false) {
logger.trace("Fetching from Remote in createIndexInput of Transfer Manager");
try (
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = streamReader.read(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
}
}
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
Expand Down

0 comments on commit b5cd3f5

Please sign in to comment.