Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.cloudfoundry.multiapps.common.SLException;
Expand All @@ -29,7 +30,7 @@
import org.cloudfoundry.multiapps.controller.core.util.ApplicationInstanceNameUtil;
import org.cloudfoundry.multiapps.controller.persistence.services.DatabaseHealthService;
import org.cloudfoundry.multiapps.controller.persistence.services.DatabaseMonitoringService;
import org.cloudfoundry.multiapps.controller.persistence.services.ObjectStoreFileStorage;
import org.cloudfoundry.multiapps.controller.persistence.services.FileStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -46,7 +47,7 @@ public class ApplicationHealthCalculator {
// timeout
private static final int TOTAL_TASK_TIMEOUT_IN_SECONDS = 3 * SINGLE_TASK_TIMEOUT_IN_SECONDS;

private final ObjectStoreFileStorage objectStoreFileStorage;
private final FileStorage objectStoreFileStorage;
private final ApplicationConfiguration applicationConfiguration;
private final DatabaseHealthService databaseHealthService;
private final DatabaseMonitoringService databaseMonitoringService;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class ApplicationHealthCalculator {
private final ResilientOperationExecutor resilientOperationExecutor = getResilienceExecutor();

@Inject
public ApplicationHealthCalculator(@Autowired(required = false) ObjectStoreFileStorage objectStoreFileStorage,
public ApplicationHealthCalculator(@Autowired(required = false) FileStorage objectStoreFileStorage,
ApplicationConfiguration applicationConfiguration, DatabaseHealthService databaseHealthService,
DatabaseMonitoringService databaseMonitoringService,
DatabaseWaitingLocksAnalyzer databaseWaitingLocksAnalyzer) {
Expand Down
23 changes: 15 additions & 8 deletions multiapps-controller-persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,19 @@
<groupId>org.apache.jclouds.provider</groupId>
<artifactId>azureblob</artifactId>
</dependency>
<dependency>
<groupId>org.apache.jclouds.provider</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>org.apache.jclouds.common</groupId>
<artifactId>googlecloud</artifactId>
</dependency>
<dependency>
<groupId>org.apache.jclouds</groupId>
<artifactId>jclouds-blobstore</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.jclouds.provider</groupId>
<artifactId>google-cloud-storage</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
Expand Down Expand Up @@ -166,6 +168,11 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-nio</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
requires flowable.engine;
requires flowable.engine.common.api;
requires flowable.variable.service.api;
requires gax;
requires google.cloud.core;
requires google.cloud.nio;
requires google.cloud.storage;
requires jakarta.inject;
requires org.apache.logging.log4j;
requires org.apache.logging.log4j.core;
Expand All @@ -53,5 +57,7 @@
requires static org.immutables.value;
requires jakarta.xml.bind;
requires org.bouncycastle.fips.pkix;

requires com.google.auth.oauth2;
requires com.google.auth;
requires org.threeten.bp;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package org.cloudfoundry.multiapps.controller.persistence.services;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRetryStrategy;
import org.cloudfoundry.multiapps.controller.persistence.Messages;
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreUtil;
import org.springframework.http.MediaType;
import org.threeten.bp.Duration;

public class GcpObjectStoreFileStorage implements FileStorage {

private final String bucketName;
private final Storage storage;
private static final String BUCKET = "bucket";
private static final String BASE_64_ENCODED_PRIVATE_KEY_DATA = "base64EncodedPrivateKeyData";

public GcpObjectStoreFileStorage(Map<String, Object> credentials) {
this.bucketName = (String) credentials.get(BUCKET);
this.storage = createObjectStoreStorage(credentials);
}

protected Storage createObjectStoreStorage(Map<String, Object> credentials) {
return StorageOptions.newBuilder()
.setCredentials(getGcpCredentialsSupplier(credentials))
.setStorageRetryStrategy(StorageRetryStrategy.getDefaultStorageRetryStrategy())
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxAttempts(6)
.setInitialRetryDelay(Duration.ofMillis(250))
.setRetryDelayMultiplier(2.0)
.setMaxRetryDelay(Duration.ofSeconds(10))
.setInitialRpcTimeout(Duration.ofSeconds(60))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofSeconds(60))
.setTotalTimeout(Duration.ofMinutes(10))
.build())
.build()
.getService();
}

private Credentials getGcpCredentialsSupplier(Map<String, Object> credentials) {
if (!credentials.containsKey(BASE_64_ENCODED_PRIVATE_KEY_DATA)) {
return null;
}
byte[] decodedKey = Base64.getDecoder()
.decode((String) credentials.get(BASE_64_ENCODED_PRIVATE_KEY_DATA));
try {
return GoogleCredentials.fromStream(new ByteArrayInputStream(decodedKey));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

@Override
public void addFile(FileEntry fileEntry, InputStream content) throws FileStorageException {
BlobId blobId = BlobId.of(bucketName, fileEntry.getId());
BlobInfo blobInfo = BlobInfo.newBuilder(blobId)
.setContentDisposition(fileEntry.getName())
.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE)
.setMetadata(ObjectStoreUtil.createFileEntryMetadata(fileEntry))
.build();

putBlob(blobInfo, content);
}

private void putBlob(BlobInfo blobInfo, InputStream content) throws FileStorageException {
try {
storage.createFrom(blobInfo, content);
} catch (IOException | StorageException e) {
throw new FileStorageException(e);
}
}

@Override
public List<FileEntry> getFileEntriesWithoutContent(List<FileEntry> fileEntries) throws FileStorageException {
Set<String> existingFiles = getAllEntries().stream()
.map(Blob::getName)
.collect(Collectors.toSet());
return fileEntries.stream()
.filter(fileEntry -> !existingFiles.contains(fileEntry.getId()))
.toList();
}

@Override
public void deleteFile(String id, String space) {
storage.delete(bucketName, id);
}

@Override
public void deleteFilesBySpaceIds(List<String> spaceIds) {
removeBlobsByFilter(blob -> ObjectStoreUtil.filterBySpaceIds(blob.getMetadata(), spaceIds));
}

@Override
public void deleteFilesBySpaceAndNamespace(String space, String namespace) {
removeBlobsByFilter(blob -> ObjectStoreUtil.filterBySpaceAndNamespace(blob.getMetadata(), space, namespace));
}

@Override
public int deleteFilesModifiedBefore(LocalDateTime modificationTime) {
return removeBlobsByFilter(
blob -> ObjectStoreUtil.filterByModificationTime(blob.getMetadata(), blob.getName(), modificationTime));
}

@Override
public <T> T processFileContent(String space, String id,
FileContentProcessor<T> fileContentProcessor) throws FileStorageException {
FileEntry fileEntry = ObjectStoreUtil.createFileEntry(space, id);
try (InputStream inputStream = openBlobStream(fileEntry)) {
return fileContentProcessor.process(inputStream);
} catch (Exception e) {
throw new FileStorageException(e);
}
}

private InputStream openBlobStream(FileEntry fileEntry) throws FileStorageException {
Blob blob = getBlob(fileEntry);
return Channels.newInputStream(blob.reader());
}

private Blob getBlob(FileEntry fileEntry) throws FileStorageException {
try {
Blob blob = storage.get(bucketName, fileEntry.getId());
if (blob == null) {
throw new FileStorageException(
MessageFormat.format(Messages.FILE_WITH_ID_AND_SPACE_DOES_NOT_EXIST,
fileEntry.getId(), fileEntry.getSpace()));
}
return blob;
} catch (StorageException e) {
throw new FileStorageException(e);
}
}

@Override
public InputStream openInputStream(String space, String id) throws FileStorageException {
FileEntry fileEntry = ObjectStoreUtil.createFileEntry(space, id);
return openBlobStream(fileEntry);
}

@Override
public void testConnection() {
storage.get(bucketName, "test");
}

@Override
public void deleteFilesByIds(List<String> fileIds) throws FileStorageException {
removeBlobsByFilter(blob -> fileIds.contains(blob.getName()));
}

@Override
public <T> T processArchiveEntryContent(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor)
throws FileStorageException {
FileEntry fileEntry = ObjectStoreUtil.createFileEntry(fileContentToProcess.getSpaceGuid(), fileContentToProcess.getGuid());
InputStream blobPayload = getBlobPayloadWithOffset(fileEntry, fileContentToProcess.getStartOffset(),
fileContentToProcess.getEndOffset());
return processContent(fileContentProcessor, blobPayload);
}

private <T> T processContent(FileContentProcessor<T> fileContentProcessor, InputStream inputStream)
throws FileStorageException {
try {
return fileContentProcessor.process(inputStream);
} catch (IOException e) {
throw new FileStorageException(e);
}
}

public Set<Blob> getAllEntries() {
return storage.list(bucketName)
.streamAll()
.collect(Collectors.toSet());
}

protected int removeBlobsByFilter(Predicate<? super Blob> filter) {
List<BlobId> blobIds = getEntryNames(filter).stream()
.map(entry -> BlobId.of(bucketName, entry))
.toList();

if (!blobIds.isEmpty()) {
storage.delete(blobIds);
}
return blobIds.size();
}

protected Set<String> getEntryNames(Predicate<? super Blob> filter) {
return storage.list(bucketName)
.streamAll()
.filter(filter)
.map(Blob::getName)
.collect(Collectors.toSet());
}

private InputStream getBlobPayloadWithOffset(FileEntry fileEntry, long startOffset, long endOffset)
throws FileStorageException {
try {
Blob blob = getBlob(fileEntry);
ReadChannel reader = storage.reader(blob.getBlobId());
reader.seek(startOffset);
reader.limit(endOffset + 1);

return Channels.newInputStream(reader);
} catch (IOException | StorageException e) {
throw new FileStorageException(e);
}
}
}
Loading