Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Cluster State] Remote state interfaces #10

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

/**
* An extension of {@link RemoteObject} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractRemoteBlobStoreObject<T> implements RemoteObject<T> {

public static final String PATH_DELIMITER = "/";

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public AbstractRemoteBlobStoreObject(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(ThreadPool.Names.GENERIC);
}

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getFullBlobName();

public String getBlobFileName() {
if (getFullBlobName() == null) {
generateBlobFileName();
}
String[] pathTokens = getFullBlobName().split(PATH_DELIMITER);
return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1];
}

public abstract String generateBlobFileName();

public abstract UploadedMetadata getUploadedMetadata();

@Override
public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener) {
return () -> {
assert get() != null;
// TODO add implementation
};
}

@Override
public T read() throws IOException {
assert getFullBlobName() != null;
return deserialize(
transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName()));
}

@Override
public void readAsync(ActionListener<T> listener) {
executorService.execute(() -> {
try {
listener.onResponse(read());
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public BlobPath getBlobPathForUpload() {
BlobPath blobPath = blobStoreRepository.basePath().add(encodeString(clusterName)).add("cluster-state").add(clusterUUID());
for (String token : getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

public BlobPath getBlobPathForDownload() {
String[] pathTokens = extractBlobPathTokens(getFullBlobName());
BlobPath blobPath = blobStoreRepository.basePath();
for (String token : pathTokens) {
blobPath = blobPath.add(token);
}
return blobPath;
}

protected Compressor getCompressor() {
return blobStoreRepository.getCompressor();
}

protected BlobStoreRepository getBlobStoreRepository() {
return this.blobStoreRepository;
}

private static String[] extractBlobPathTokens(String blobName) {
String[] blobNameTokens = blobName.split(PATH_DELIMITER);
return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1);
}

private static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.util.List;

public class BlobPathParameters {

private List<String> pathTokens;
private String filePrefix;

public BlobPathParameters(List<String> pathTokens, String filePrefix) {
this.pathTokens = pathTokens;
this.filePrefix = filePrefix;
}

public List<String> getPathTokens() {
return pathTokens;
}

public String getFilePrefix() {
return filePrefix;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.io.IOException;
import java.io.InputStream;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;

/**
* An interface to read/write and object from/to a remote storage. This interface is agnostic of the remote storage type.
* @param <T> The object type which can be upload to or download from remote storage.
*/
public interface RemoteObject <T> {
public T get();
public String clusterUUID();
public InputStream serialize() throws IOException;
public T deserialize(InputStream inputStream) throws IOException;

public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener);
public T read() throws IOException;
public void readAsync(ActionListener<T> listener);

}
Loading