Skip to content

Commit

Permalink
Refactor code and fix units.
Browse files Browse the repository at this point in the history
  • Loading branch information
CTMBNara committed Sep 2, 2024
1 parent f75d22d commit c86e9cc
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 416 deletions.
209 changes: 113 additions & 96 deletions src/main/java/org/prebid/server/settings/S3ApplicationSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.auction.model.Tuple2;
import org.prebid.server.exception.PreBidException;
import org.prebid.server.execution.Timeout;
Expand All @@ -12,6 +14,7 @@
import org.prebid.server.settings.model.Account;
import org.prebid.server.settings.model.StoredDataResult;
import org.prebid.server.settings.model.StoredResponseDataResult;
import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -20,8 +23,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -46,15 +50,14 @@ public class S3ApplicationSettings implements ApplicationSettings {
final JacksonMapper jacksonMapper;
final Vertx vertx;

public S3ApplicationSettings(
S3AsyncClient asyncClient,
String bucket,
String accountsDirectory,
String storedImpressionsDirectory,
String storedRequestsDirectory,
String storedResponsesDirectory,
JacksonMapper jacksonMapper,
Vertx vertx) {
public S3ApplicationSettings(S3AsyncClient asyncClient,
String bucket,
String accountsDirectory,
String storedImpressionsDirectory,
String storedRequestsDirectory,
String storedResponsesDirectory,
JacksonMapper jacksonMapper,
Vertx vertx) {

this.asyncClient = Objects.requireNonNull(asyncClient);
this.bucket = Objects.requireNonNull(bucket);
Expand All @@ -68,106 +71,97 @@ public S3ApplicationSettings(

@Override
public Future<Account> getAccountById(String accountId, Timeout timeout) {
return downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX)
.flatMap(fileContentOpt -> fileContentOpt.map(Future::succeededFuture)
.orElseGet(() -> Future.failedFuture(
new PreBidException("Account with id %s not found".formatted(accountId)))
)
)
.map(fileContent -> jacksonMapper.decodeValue(fileContent, Account.class))
.flatMap(account -> {
if (!Objects.equals(account.getId(), accountId)) {
return Future.failedFuture(new PreBidException(
"Account with id %s does not match id %s in file".formatted(accountId, account.getId()))
);
}
return Future.succeededFuture(account);
})
.recover(ex -> {
if (ex instanceof DecodeException) {
return Future
.failedFuture(
new PreBidException(
"Invalid json for account with id %s".formatted(accountId)));
}
// if a previous validation already yielded a PreBidException, just return it
if (ex instanceof PreBidException) {
return Future.failedFuture(ex);
}
return Future
.failedFuture(new PreBidException("Account with id %s not found".formatted(accountId)));
});
return withTimeout(() -> downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX), timeout)
.map(fileContent -> decodeAccount(fileContent, accountId));
}

private Account decodeAccount(String fileContent, String requestedAccountId) {
if (fileContent == null) {
throw new PreBidException("Account with id %s not found".formatted(requestedAccountId));
}

final Account account;
try {
account = jacksonMapper.decodeValue(fileContent, Account.class);
} catch (DecodeException e) {
throw new PreBidException("Invalid json for account with id %s".formatted(requestedAccountId));
}

validateAccount(account, requestedAccountId);
return account;
}

private static void validateAccount(Account account, String requestedAccountId) {
final String receivedAccountId = account != null ? account.getId() : null;
if (!StringUtils.equals(receivedAccountId, requestedAccountId)) {
throw new PreBidException(
"Account with id %s does not match id %s in file".formatted(requestedAccountId, receivedAccountId));
}
}

@Override
public Future<StoredDataResult> getStoredData(
String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {

return getFileContents(storedRequestsDirectory, requestIds)
.compose(storedIdToRequest -> getFileContents(storedImpressionsDirectory, impIds)
.map(storedIdToImp ->
buildStoredDataResult(storedIdToRequest, storedIdToImp, requestIds, impIds))
);
public Future<StoredDataResult> getStoredData(String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {

return withTimeout(
() -> Future.all(
getFileContents(storedRequestsDirectory, requestIds),
getFileContents(storedImpressionsDirectory, impIds)),
timeout)
.map(results -> buildStoredDataResult(
results.resultAt(0),
results.resultAt(1),
requestIds,
impIds));
}

private StoredDataResult buildStoredDataResult(
Map<String, String> storedIdToRequest,
Map<String, String> storedIdToImp,
Set<String> requestIds,
Set<String> impIds
) {

final Stream<String> missingStoredRequestIds =
getMissingStoredDataIds(storedIdToRequest, requestIds).stream()
.map("No stored request found for id: %s"::formatted);
final Stream<String> missingStoredImpressionIds =
getMissingStoredDataIds(storedIdToImp, impIds).stream()
.map("No stored impression found for id: %s"::formatted);

return StoredDataResult.of(
storedIdToRequest,
storedIdToImp,
Stream.concat(
missingStoredImpressionIds,
missingStoredRequestIds).toList());
private StoredDataResult buildStoredDataResult(Map<String, String> storedIdToRequest,
Map<String, String> storedIdToImp,
Set<String> requestIds,
Set<String> impIds) {

final List<String> errors = Stream.concat(
missingStoredDataIds(storedIdToImp, impIds).stream()
.map("No stored impression found for id: %s"::formatted),
missingStoredDataIds(storedIdToRequest, requestIds).stream()
.map("No stored request found for id: %s"::formatted))
.toList();

return StoredDataResult.of(storedIdToRequest, storedIdToImp, errors);
}

private Set<String> getMissingStoredDataIds(Map<String, String> fileContents, Set<String> responseIds) {
private Set<String> missingStoredDataIds(Map<String, String> fileContents, Set<String> responseIds) {
return SetUtils.difference(responseIds, fileContents.keySet());
}

@Override
public Future<StoredDataResult> getAmpStoredData(
String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {
public Future<StoredDataResult> getAmpStoredData(String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {

return getStoredData(accountId, requestIds, Collections.emptySet(), timeout);
}

@Override
public Future<StoredDataResult> getVideoStoredData(
String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {
public Future<StoredDataResult> getVideoStoredData(String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {

return getStoredData(accountId, requestIds, impIds, timeout);
}

@Override
public Future<StoredResponseDataResult> getStoredResponses(Set<String> responseIds, Timeout timeout) {
return getFileContents(storedResponsesDirectory, responseIds).map(storedIdToResponse -> {
final List<String> missingStoredResponseIds =
getMissingStoredDataIds(storedIdToResponse, responseIds).stream()
.map("No stored response found for id: %s"::formatted).toList();

return StoredResponseDataResult.of(storedIdToResponse, missingStoredResponseIds);
});
return withTimeout(() -> getFileContents(storedResponsesDirectory, responseIds), timeout)
.map(storedIdToResponse -> StoredResponseDataResult.of(
storedIdToResponse,
missingStoredDataIds(storedIdToResponse, responseIds).stream()
.map("No stored response found for id: %s"::formatted)
.toList()));
}

@Override
Expand All @@ -176,14 +170,13 @@ public Future<Map<String, String>> getCategories(String primaryAdServer, String
}

private Future<Map<String, String>> getFileContents(String directory, Set<String> ids) {
return CompositeFuture.join(ids.stream()
.<Future>map(impId -> downloadFile(directory + withInitialSlash(impId) + JSON_SUFFIX)
.map(fileContentOpt -> fileContentOpt
.map(fileContent -> Tuple2.of(impId, fileContent))))
return Future.join(ids.stream()
.map(impId -> downloadFile(directory + withInitialSlash(impId) + JSON_SUFFIX)
.map(fileContent -> Tuple2.of(impId, fileContent)))
.toList())
.map(CompositeFuture::<Optional<Tuple2<String, String>>>list)
.map(CompositeFuture::<Tuple2<String, String>>list)
.map(impIdToFileContent -> impIdToFileContent.stream()
.flatMap(Optional::stream)
.filter(tuple -> tuple.getRight() != null)
.collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight)));
}

Expand All @@ -198,13 +191,37 @@ private static String withInitialSlash(String impressionId) {
return impressionId.startsWith("/") ? impressionId : "/" + impressionId;
}

private Future<Optional<String>> downloadFile(String key) {
private Future<String> downloadFile(String key) {
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();

return Future.fromCompletionStage(
asyncClient.getObject(request, AsyncResponseTransformer.toBytes()),
vertx.getOrCreateContext())
.map(test -> Optional.of(test.asUtf8String())).recover(ex -> Future.succeededFuture(Optional.empty()));
.map(BytesWrapper::asUtf8String)
.otherwiseEmpty();
}

private <T> Future<T> withTimeout(Supplier<Future<T>> futureFactory, Timeout timeout) {
final long remainingTime = timeout.remaining();
if (remainingTime <= 0L) {
return Future.failedFuture(new TimeoutException("Timeout has been exceeded"));
}

final Promise<T> promise = Promise.promise();
final Future<T> future = futureFactory.get();

final long timerId = vertx.setTimer(remainingTime, id ->
promise.tryFail(new TimeoutException("Timeout has been exceeded")));

future.onComplete(result -> {
vertx.cancelTimer(timerId);
if (result.succeeded()) {
promise.tryComplete(result.result());
} else {
promise.tryFail(result.cause());
}
});

return promise.future();
}
}
Loading

0 comments on commit c86e9cc

Please sign in to comment.