diff --git a/docs/application-settings.md b/docs/application-settings.md index c51febaea3e..39fd52c7e5a 100644 --- a/docs/application-settings.md +++ b/docs/application-settings.md @@ -259,6 +259,51 @@ Here's an example YAML file containing account-specific settings: default: true ``` +## Setting Account Configuration in S3 + +This is identical to the account configuration in a file system, with the main difference that your file system is +[AWS S3](https://aws.amazon.com/de/s3/) or any S3 compatible storage, such as [MinIO](https://min.io/). + + +The general idea is that you'll place all the account-specific settings in a separate YAML file and point to that file. + +```yaml +settings: + s3: + accessKeyId: + secretAccessKey: + endpoint: # http://s3.storage.com + bucket: # prebid-application-settings + region: # if not provided AWS_GLOBAL will be used. Example value: 'eu-central-1' + accounts-dir: accounts + stored-imps-dir: stored-impressions + stored-requests-dir: stored-requests + stored-responses-dir: stored-responses + + # recommended to configure an in memory cache, but this is optional + in-memory-cache: + # example settings, tailor to your needs + cache-size: 100000 + ttl-seconds: 1200 # 20 minutes + # recommended to configure + s3-update: + refresh-rate: 900000 # Refresh every 15 minutes + timeout: 5000 +``` + +### File format + +We recommend using the `json` format for your account configuration. A minimal configuration may look like this. + +```json +{ + "id" : "979c7116-1f5a-43d4-9a87-5da3ccc4f52c", + "status" : "active" +} +``` + +This pairs nicely if you have a default configuration defined in your prebid server config under `settings.default-account-config`. + ## Setting Account Configuration in the Database In database approach account properties are stored in database table(s). diff --git a/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityBidResponsesScanHookTest.java b/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityBidResponsesScanHookTest.java index 2bd6a01b993..d4b39a8214e 100644 --- a/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityBidResponsesScanHookTest.java +++ b/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityBidResponsesScanHookTest.java @@ -71,11 +71,7 @@ public void setUp() { @Test public void codeShouldHaveValidConfigsWhenInitialized() { - // given - - // when - - // then + // when and then assertThat(target.code()).isEqualTo("confiant-ad-quality-bid-responses-scan-hook"); } diff --git a/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityModuleTest.java b/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityModuleTest.java index 33ad4eef240..41e63920319 100644 --- a/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityModuleTest.java +++ b/extra/modules/confiant-ad-quality/src/test/java/org/prebid/server/hooks/modules/com/confiant/adquality/v1/ConfiantAdQualityModuleTest.java @@ -8,11 +8,7 @@ public class ConfiantAdQualityModuleTest { @Test public void shouldHaveValidInitialConfigs() { - // given - - // when - - // then + // when and then assertThat(ConfiantAdQualityModule.CODE).isEqualTo("confiant-ad-quality"); } } diff --git a/extra/modules/fiftyone-devicedetection/src/test/java/org/prebid/server/hooks/modules/fiftyone/devicedetection/v1/hooks/FiftyOneDeviceDetectionRawAuctionRequestHookTest.java b/extra/modules/fiftyone-devicedetection/src/test/java/org/prebid/server/hooks/modules/fiftyone/devicedetection/v1/hooks/FiftyOneDeviceDetectionRawAuctionRequestHookTest.java index 5e3582b5297..cfd079299a0 100644 --- a/extra/modules/fiftyone-devicedetection/src/test/java/org/prebid/server/hooks/modules/fiftyone/devicedetection/v1/hooks/FiftyOneDeviceDetectionRawAuctionRequestHookTest.java +++ b/extra/modules/fiftyone-devicedetection/src/test/java/org/prebid/server/hooks/modules/fiftyone/devicedetection/v1/hooks/FiftyOneDeviceDetectionRawAuctionRequestHookTest.java @@ -402,7 +402,6 @@ public void callShouldReturnUpdateActionWhenFilterIsNull() { @Test public void callShouldReturnUpdateActionWhenNoWhitelistAndNoAuctionContext() { // given - final AuctionInvocationContext context = AuctionInvocationContextImpl.of( null, null, @@ -470,7 +469,6 @@ public void callShouldReturnNoUpdateActionWhenWhitelistFilledAndNoAuctionContext @Test public void callShouldReturnUpdateActionWhenNoWhitelistAndNoAccount() { // given - final AuctionContext auctionContext = AuctionContext.builder().build(); final AuctionInvocationContext context = AuctionInvocationContextImpl.of( null, @@ -493,7 +491,6 @@ public void callShouldReturnUpdateActionWhenNoWhitelistAndNoAccount() { @Test public void callShouldReturnNoUpdateActionWhenNoWhitelistAndNoAccountButDeviceIdIsSet() { // given - final AuctionContext auctionContext = AuctionContext.builder().build(); final AuctionInvocationContext context = AuctionInvocationContextImpl.of( null, @@ -568,7 +565,6 @@ public void callShouldReturnNoUpdateActionWhenWhitelistFilledAndNoAccount() { @Test public void callShouldReturnUpdateActionWhenNoWhitelistAndNoAccountID() { // given - final AuctionContext auctionContext = AuctionContext.builder() .account(Account.builder() .build()) @@ -648,7 +644,6 @@ public void callShouldReturnNoUpdateActionWhenWhitelistFilledAndNoAccountID() { @Test public void callShouldReturnUpdateActionWhenNoWhitelistAndEmptyAccountID() { // given - final AuctionContext auctionContext = AuctionContext.builder() .account(Account.builder() .id("") @@ -731,7 +726,6 @@ public void callShouldReturnNoUpdateActionWhenWhitelistFilledAndEmptyAccountID() @Test public void callShouldReturnUpdateActionWhenNoWhitelistAndAllowedAccountID() { // given - final AuctionContext auctionContext = AuctionContext.builder() .account(Account.builder() .id("42") @@ -814,7 +808,6 @@ public void callShouldReturnUpdateActionWhenWhitelistFilledAndAllowedAccountID() @Test public void callShouldReturnUpdateActionWhenNoWhitelistAndNotAllowedAccountID() { // given - final AuctionContext auctionContext = AuctionContext.builder() .account(Account.builder() .id("29") diff --git a/extra/pom.xml b/extra/pom.xml index 43cc49760ab..876f4447d7d 100644 --- a/extra/pom.xml +++ b/extra/pom.xml @@ -52,6 +52,7 @@ 3.21.7 3.17.3 1.0.7 + 2.26.24 3.9.1 @@ -212,6 +213,11 @@ geoip2 ${maxmind-client.version} + + software.amazon.awssdk + s3 + ${aws.awssdk.version} + com.google.protobuf protobuf-java diff --git a/pom.xml b/pom.xml index b678589a9e2..f02c03d7f4a 100644 --- a/pom.xml +++ b/pom.xml @@ -170,6 +170,10 @@ org.postgresql postgresql + + software.amazon.awssdk + s3 + com.github.ben-manes.caffeine caffeine @@ -328,6 +332,11 @@ mysql test + + org.testcontainers + localstack + test + org.testcontainers postgresql diff --git a/sample/configs/prebid-config-s3.yaml b/sample/configs/prebid-config-s3.yaml new file mode 100644 index 00000000000..277ad94613c --- /dev/null +++ b/sample/configs/prebid-config-s3.yaml @@ -0,0 +1,60 @@ +status-response: "ok" + +server: + enable-quickack: true + enable-reuseport: true + +adapters: + appnexus: + enabled: true + ix: + enabled: true + openx: + enabled: true + pubmatic: + enabled: true + rubicon: + enabled: true +metrics: + prefix: prebid +cache: + scheme: http + host: localhost + path: /cache + query: uuid= +settings: + enforce-valid-account: false + generate-storedrequest-bidrequest-id: true + s3: + accessKeyId: prebid-server-test + secretAccessKey: nq9h6whXQURNL2NnWg3rcMlLMtGGDJeWrdl8hC9g + endpoint: http://localhost:9000 + bucket: prebid-server-configs.example.com # prebid-application-settings + force-path-style: true # virtual bucketing + # region: # if not provided AWS_GLOBAL will be used. Example value: 'eu-central-1' + accounts-dir: accounts + stored-imps-dir: stored-impressions + stored-requests-dir: stored-requests + stored-responses-dir: stored-responses + + in-memory-cache: + cache-size: 10000 + ttl-seconds: 1200 # 20 minutes + s3-update: + refresh-rate: 900000 # Refresh every 15 minutes + timeout: 5000 + +gdpr: + default-value: 1 + vendorlist: + v2: + cache-dir: /var/tmp/vendor2 + v3: + cache-dir: /var/tmp/vendor3 + +admin-endpoints: + logging-changelevel: + enabled: true + path: /logging/changelevel + on-application-port: true + protected: false diff --git a/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java b/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java new file mode 100644 index 00000000000..f6198a5ad94 --- /dev/null +++ b/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java @@ -0,0 +1,227 @@ +package org.prebid.server.settings; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.commons.collections4.SetUtils; +import org.apache.commons.lang3.StringUtils; +import org.prebid.server.auction.model.Tuple2; +import org.prebid.server.exception.PreBidException; +import org.prebid.server.execution.Timeout; +import org.prebid.server.json.DecodeException; +import org.prebid.server.json.JacksonMapper; +import org.prebid.server.settings.model.Account; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.settings.model.StoredResponseDataResult; +import software.amazon.awssdk.core.BytesWrapper; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Implementation of {@link ApplicationSettings}. + *

+ * Reads an application settings from JSON file in a s3 bucket, stores and serves them in and from the memory. + *

+ * Immediately loads stored request data from local files. These are stored in memory for low-latency reads. + * This expects each file in the directory to be named "{config_id}.json". + */ +public class S3ApplicationSettings implements ApplicationSettings { + + private static final String JSON_SUFFIX = ".json"; + + final S3AsyncClient asyncClient; + final String bucket; + final String accountsDirectory; + final String storedImpressionsDirectory; + final String storedRequestsDirectory; + final String storedResponsesDirectory; + final JacksonMapper jacksonMapper; + final Vertx vertx; + + public S3ApplicationSettings(S3AsyncClient asyncClient, + String bucket, + String accountsDirectory, + String storedImpressionsDirectory, + String storedRequestsDirectory, + String storedResponsesDirectory, + JacksonMapper jacksonMapper, + Vertx vertx) { + + this.asyncClient = Objects.requireNonNull(asyncClient); + this.bucket = Objects.requireNonNull(bucket); + this.accountsDirectory = Objects.requireNonNull(accountsDirectory); + this.storedImpressionsDirectory = Objects.requireNonNull(storedImpressionsDirectory); + this.storedRequestsDirectory = Objects.requireNonNull(storedRequestsDirectory); + this.storedResponsesDirectory = Objects.requireNonNull(storedResponsesDirectory); + this.jacksonMapper = Objects.requireNonNull(jacksonMapper); + this.vertx = Objects.requireNonNull(vertx); + } + + @Override + public Future getAccountById(String accountId, Timeout timeout) { + return withTimeout(() -> downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX), timeout) + .map(fileContent -> decodeAccount(fileContent, accountId)); + } + + private Account decodeAccount(String fileContent, String requestedAccountId) { + if (fileContent == null) { + throw new PreBidException("Account with id %s not found".formatted(requestedAccountId)); + } + + final Account account; + try { + account = jacksonMapper.decodeValue(fileContent, Account.class); + } catch (DecodeException e) { + throw new PreBidException("Invalid json for account with id %s".formatted(requestedAccountId)); + } + + validateAccount(account, requestedAccountId); + return account; + } + + private static void validateAccount(Account account, String requestedAccountId) { + final String receivedAccountId = account != null ? account.getId() : null; + if (!StringUtils.equals(receivedAccountId, requestedAccountId)) { + throw new PreBidException( + "Account with id %s does not match id %s in file".formatted(requestedAccountId, receivedAccountId)); + } + } + + @Override + public Future getStoredData(String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return withTimeout( + () -> Future.all( + getFileContents(storedRequestsDirectory, requestIds), + getFileContents(storedImpressionsDirectory, impIds)), + timeout) + .map(results -> buildStoredDataResult( + results.resultAt(0), + results.resultAt(1), + requestIds, + impIds)); + } + + private StoredDataResult buildStoredDataResult(Map storedIdToRequest, + Map storedIdToImp, + Set requestIds, + Set impIds) { + + final List errors = Stream.concat( + missingStoredDataIds(storedIdToImp, impIds).stream() + .map("No stored impression found for id: %s"::formatted), + missingStoredDataIds(storedIdToRequest, requestIds).stream() + .map("No stored request found for id: %s"::formatted)) + .toList(); + + return StoredDataResult.of(storedIdToRequest, storedIdToImp, errors); + } + + private Set missingStoredDataIds(Map fileContents, Set responseIds) { + return SetUtils.difference(responseIds, fileContents.keySet()); + } + + @Override + public Future getAmpStoredData(String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return getStoredData(accountId, requestIds, Collections.emptySet(), timeout); + } + + @Override + public Future getVideoStoredData(String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return getStoredData(accountId, requestIds, impIds, timeout); + } + + @Override + public Future getStoredResponses(Set responseIds, Timeout timeout) { + return withTimeout(() -> getFileContents(storedResponsesDirectory, responseIds), timeout) + .map(storedIdToResponse -> StoredResponseDataResult.of( + storedIdToResponse, + missingStoredDataIds(storedIdToResponse, responseIds).stream() + .map("No stored response found for id: %s"::formatted) + .toList())); + } + + @Override + public Future> getCategories(String primaryAdServer, String publisher, Timeout timeout) { + return Future.succeededFuture(Collections.emptyMap()); + } + + private Future> getFileContents(String directory, Set ids) { + return Future.join(ids.stream() + .map(impId -> downloadFile(directory + withInitialSlash(impId) + JSON_SUFFIX) + .map(fileContent -> Tuple2.of(impId, fileContent))) + .toList()) + .map(CompositeFuture::>list) + .map(impIdToFileContent -> impIdToFileContent.stream() + .filter(tuple -> tuple.getRight() != null) + .collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight))); + } + + /** + * When the impression id is the ad unit path it may already start with a slash and there's no need to add + * another one. + * + * @param impressionId from the bid request + * @return impression id with only a single slash at the beginning + */ + private static String withInitialSlash(String impressionId) { + return impressionId.startsWith("/") ? impressionId : "/" + impressionId; + } + + private Future downloadFile(String key) { + final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + return Future.fromCompletionStage( + asyncClient.getObject(request, AsyncResponseTransformer.toBytes()), + vertx.getOrCreateContext()) + .map(BytesWrapper::asUtf8String) + .otherwiseEmpty(); + } + + private Future withTimeout(Supplier> futureFactory, Timeout timeout) { + final long remainingTime = timeout.remaining(); + if (remainingTime <= 0L) { + return Future.failedFuture(new TimeoutException("Timeout has been exceeded")); + } + + final Promise promise = Promise.promise(); + final Future future = futureFactory.get(); + + final long timerId = vertx.setTimer(remainingTime, id -> + promise.tryFail(new TimeoutException("Timeout has been exceeded"))); + + future.onComplete(result -> { + vertx.cancelTimer(timerId); + if (result.succeeded()) { + promise.tryComplete(result.result()); + } else { + promise.tryFail(result.cause()); + } + }); + + return promise.future(); + } +} diff --git a/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java b/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java new file mode 100644 index 00000000000..d5a8ce7f873 --- /dev/null +++ b/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java @@ -0,0 +1,146 @@ +package org.prebid.server.settings.service; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.prebid.server.auction.model.Tuple2; +import org.prebid.server.log.Logger; +import org.prebid.server.log.LoggerFactory; +import org.prebid.server.metric.MetricName; +import org.prebid.server.metric.Metrics; +import org.prebid.server.settings.CacheNotificationListener; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.vertx.Initializable; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.time.Clock; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + *

+ * Service that periodically calls s3 for stored request updates. + * If refreshRate is negative, then the data will never be refreshed. + *

+ * Fetches all files from the specified folders/prefixes in s3 and downloads all files. + */ +public class S3PeriodicRefreshService implements Initializable { + + private static final String JSON_SUFFIX = ".json"; + + private static final Logger logger = LoggerFactory.getLogger(S3PeriodicRefreshService.class); + + private final S3AsyncClient asyncClient; + private final String bucket; + private final String storedRequestsDirectory; + private final String storedImpressionsDirectory; + private final long refreshPeriod; + private final CacheNotificationListener cacheNotificationListener; + private final MetricName cacheType; + private final Clock clock; + private final Metrics metrics; + private final Vertx vertx; + + public S3PeriodicRefreshService(S3AsyncClient asyncClient, + String bucket, + String storedRequestsDirectory, + String storedImpressionsDirectory, + long refreshPeriod, + CacheNotificationListener cacheNotificationListener, + MetricName cacheType, + Clock clock, + Metrics metrics, + Vertx vertx) { + + this.asyncClient = Objects.requireNonNull(asyncClient); + this.bucket = Objects.requireNonNull(bucket); + this.storedRequestsDirectory = Objects.requireNonNull(storedRequestsDirectory); + this.storedImpressionsDirectory = Objects.requireNonNull(storedImpressionsDirectory); + this.refreshPeriod = refreshPeriod; + this.cacheNotificationListener = Objects.requireNonNull(cacheNotificationListener); + this.cacheType = Objects.requireNonNull(cacheType); + this.clock = Objects.requireNonNull(clock); + this.metrics = Objects.requireNonNull(metrics); + this.vertx = Objects.requireNonNull(vertx); + } + + @Override + public void initialize(Promise initializePromise) { + fetchStoredDataResult(clock.millis(), MetricName.initialize) + .mapEmpty() + .onComplete(initializePromise); + + if (refreshPeriod > 0) { + logger.info("Starting s3 periodic refresh for " + cacheType + " every " + refreshPeriod + " s"); + vertx.setPeriodic(refreshPeriod, ignored -> fetchStoredDataResult(clock.millis(), MetricName.update)); + } + } + + private Future fetchStoredDataResult(long startTime, MetricName metricName) { + return Future.all( + getFileContentsForDirectory(storedRequestsDirectory), + getFileContentsForDirectory(storedImpressionsDirectory)) + .map(CompositeFuture::>list) + .map(results -> StoredDataResult.of(results.getFirst(), results.get(1), Collections.emptyList())) + .onSuccess(storedDataResult -> handleResult(storedDataResult, startTime, metricName)) + .onFailure(exception -> handleFailure(exception, startTime, metricName)); + } + + private Future> getFileContentsForDirectory(String directory) { + return listFiles(directory) + .map(files -> files.stream().map(this::downloadFile).toList()) + .compose(Future::all) + .map(CompositeFuture::>list) + .map(fileNameToContent -> fileNameToContent.stream() + .collect(Collectors.toMap( + entry -> stripFileName(directory, entry.getLeft()), + Tuple2::getRight))); + } + + private Future> listFiles(String prefix) { + final ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() + .bucket(bucket) + .prefix(prefix) + .build(); + + return Future.fromCompletionStage(asyncClient.listObjects(listObjectsRequest), vertx.getOrCreateContext()) + .map(response -> response.contents().stream() + .map(S3Object::key) + .collect(Collectors.toList())); + } + + private Future> downloadFile(String key) { + final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + return Future.fromCompletionStage( + asyncClient.getObject(request, AsyncResponseTransformer.toBytes()), + vertx.getOrCreateContext()) + .map(content -> Tuple2.of(key, content.asUtf8String())); + } + + private static String stripFileName(String directory, String name) { + return name + .replace(directory + "/", "") + .replace(JSON_SUFFIX, ""); + } + + private void handleResult(StoredDataResult storedDataResult, long startTime, MetricName refreshType) { + cacheNotificationListener.save(storedDataResult.getStoredIdToRequest(), storedDataResult.getStoredIdToImp()); + metrics.updateSettingsCacheRefreshTime(cacheType, refreshType, clock.millis() - startTime); + } + + private void handleFailure(Throwable exception, long startTime, MetricName refreshType) { + logger.warn("Error occurred while request to s3 refresh service", exception); + + metrics.updateSettingsCacheRefreshTime(cacheType, refreshType, clock.millis() - startTime); + metrics.updateSettingsCacheRefreshErrorMetric(cacheType, refreshType); + } +} diff --git a/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java b/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java index 1006403c9c4..f7aaa9bb4ba 100644 --- a/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java +++ b/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java @@ -20,10 +20,12 @@ import org.prebid.server.settings.EnrichingApplicationSettings; import org.prebid.server.settings.FileApplicationSettings; import org.prebid.server.settings.HttpApplicationSettings; +import org.prebid.server.settings.S3ApplicationSettings; import org.prebid.server.settings.SettingsCache; import org.prebid.server.settings.helper.ParametrizedQueryHelper; import org.prebid.server.settings.service.DatabasePeriodicRefreshService; import org.prebid.server.settings.service.HttpPeriodicRefreshService; +import org.prebid.server.settings.service.S3PeriodicRefreshService; import org.prebid.server.spring.config.database.DatabaseConfiguration; import org.prebid.server.vertx.database.DatabaseClient; import org.prebid.server.vertx.httpclient.HttpClient; @@ -37,12 +39,20 @@ import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; - -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotNull; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Clock; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Stream; @UtilityClass @@ -217,6 +227,115 @@ public DatabasePeriodicRefreshService ampDatabasePeriodicRefreshService( } } + @Configuration + @ConditionalOnProperty(prefix = "settings.s3", name = {"accounts-dir", "stored-imps-dir", "stored-requests-dir"}) + static class S3SettingsConfiguration { + + @Component + @ConfigurationProperties(prefix = "settings.s3") + @ConditionalOnProperty(prefix = "settings.s3", name = {"accessKeyId", "secretAccessKey"}) + @Validated + @Data + @NoArgsConstructor + protected static class S3ConfigurationProperties { + + @NotBlank + private String accessKeyId; + + @NotBlank + private String secretAccessKey; + + /** + * If not provided AWS_GLOBAL will be used as a region + */ + private String region; + + @NotBlank + private String endpoint; + + @NotBlank + private String bucket; + + @NotBlank + private Boolean forcePathStyle; + + @NotBlank + private String accountsDir; + + @NotBlank + private String storedImpsDir; + + @NotBlank + private String storedRequestsDir; + + @NotBlank + private String storedResponsesDir; + } + + @Bean + S3AsyncClient s3AsyncClient(S3ConfigurationProperties s3ConfigurationProperties) throws URISyntaxException { + final AwsBasicCredentials credentials = AwsBasicCredentials.create( + s3ConfigurationProperties.getAccessKeyId(), + s3ConfigurationProperties.getSecretAccessKey()); + final Region awsRegion = Optional.ofNullable(s3ConfigurationProperties.getRegion()) + .map(Region::of) + .orElse(Region.AWS_GLOBAL); + + return S3AsyncClient + .builder() + .credentialsProvider(StaticCredentialsProvider.create(credentials)) + .endpointOverride(new URI(s3ConfigurationProperties.getEndpoint())) + .forcePathStyle(s3ConfigurationProperties.getForcePathStyle()) + .region(awsRegion) + .build(); + } + + @Bean + S3ApplicationSettings s3ApplicationSettings(S3AsyncClient s3AsyncClient, + S3ConfigurationProperties s3ConfigurationProperties, + JacksonMapper mapper, + Vertx vertx) { + + return new S3ApplicationSettings( + s3AsyncClient, + s3ConfigurationProperties.getBucket(), + s3ConfigurationProperties.getAccountsDir(), + s3ConfigurationProperties.getStoredImpsDir(), + s3ConfigurationProperties.getStoredRequestsDir(), + s3ConfigurationProperties.getStoredResponsesDir(), + mapper, + vertx); + } + } + + @Configuration + @ConditionalOnProperty(prefix = "settings.in-memory-cache.s3-update", name = {"refresh-rate", "timeout"}) + static class S3PeriodicRefreshServiceConfiguration { + + @Bean + public S3PeriodicRefreshService s3PeriodicRefreshService( + S3AsyncClient s3AsyncClient, + S3SettingsConfiguration.S3ConfigurationProperties s3ConfigurationProperties, + @Value("${settings.in-memory-cache.s3-update.refresh-rate}") long refreshPeriod, + SettingsCache settingsCache, + Clock clock, + Metrics metrics, + Vertx vertx) { + + return new S3PeriodicRefreshService( + s3AsyncClient, + s3ConfigurationProperties.getBucket(), + s3ConfigurationProperties.getStoredRequestsDir(), + s3ConfigurationProperties.getStoredImpsDir(), + refreshPeriod, + settingsCache, + MetricName.stored_request, + clock, + metrics, + vertx); + } + } + /** * This configuration defines a collection of application settings fetchers and its ordering. */ @@ -227,14 +346,16 @@ static class CompositeSettingsConfiguration { CompositeApplicationSettings compositeApplicationSettings( @Autowired(required = false) FileApplicationSettings fileApplicationSettings, @Autowired(required = false) DatabaseApplicationSettings databaseApplicationSettings, - @Autowired(required = false) HttpApplicationSettings httpApplicationSettings) { + @Autowired(required = false) HttpApplicationSettings httpApplicationSettings, + @Autowired(required = false) S3ApplicationSettings s3ApplicationSettings) { - final List applicationSettingsList = - Stream.of(fileApplicationSettings, - databaseApplicationSettings, - httpApplicationSettings) - .filter(Objects::nonNull) - .toList(); + final List applicationSettingsList = Stream.of( + fileApplicationSettings, + databaseApplicationSettings, + s3ApplicationSettings, + httpApplicationSettings) + .filter(Objects::nonNull) + .toList(); return new CompositeApplicationSettings(applicationSettingsList); } @@ -338,7 +459,7 @@ SettingsCache videoSettingCache(ApplicationSettingsCacheProperties cacheProperti @Validated @Data @NoArgsConstructor - private static class ApplicationSettingsCacheProperties { + protected static class ApplicationSettingsCacheProperties { @NotNull @Min(1) diff --git a/src/test/groovy/org/prebid/server/functional/service/S3Service.groovy b/src/test/groovy/org/prebid/server/functional/service/S3Service.groovy new file mode 100644 index 00000000000..4a25b6d6ca0 --- /dev/null +++ b/src/test/groovy/org/prebid/server/functional/service/S3Service.groovy @@ -0,0 +1,103 @@ +package org.prebid.server.functional.service + +import org.prebid.server.functional.model.config.AccountConfig +import org.prebid.server.functional.model.db.StoredImp +import org.prebid.server.functional.model.db.StoredRequest +import org.prebid.server.functional.model.db.StoredResponse +import org.prebid.server.functional.util.ObjectMapperWrapper +import org.testcontainers.containers.localstack.LocalStackContainer +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.CreateBucketRequest +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request +import software.amazon.awssdk.services.s3.model.PutObjectRequest +import software.amazon.awssdk.services.s3.model.PutObjectResponse + +final class S3Service implements ObjectMapperWrapper { + + private final S3Client s3PbsService + private final LocalStackContainer localStackContainer + + static final def DEFAULT_ACCOUNT_DIR = 'account' + static final def DEFAULT_IMPS_DIR = 'stored-impressions' + static final def DEFAULT_REQUEST_DIR = 'stored-requests' + static final def DEFAULT_RESPONSE_DIR = 'stored-responses' + + S3Service(LocalStackContainer localStackContainer) { + this.localStackContainer = localStackContainer + s3PbsService = S3Client.builder() + .endpointOverride(localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + localStackContainer.getAccessKey(), + localStackContainer.getSecretKey()))) + .region(Region.of(localStackContainer.getRegion())) + .build() + } + + String getAccessKeyId() { + localStackContainer.accessKey + } + + String getSecretKeyId() { + localStackContainer.secretKey + } + + String getEndpoint() { + "http://${localStackContainer.getNetworkAliases().get(0)}:${localStackContainer.getExposedPorts().get(0)}" + } + + String getRegion() { + localStackContainer.region + } + + void createBucket(String bucketName) { + CreateBucketRequest createBucketRequest = CreateBucketRequest.builder() + .bucket(bucketName) + .build() + s3PbsService.createBucket(createBucketRequest) + } + + void deleteBucket(String bucketName) { + DeleteBucketRequest deleteBucketRequest = DeleteBucketRequest.builder() + .bucket(bucketName) + .build() + s3PbsService.deleteBucket(deleteBucketRequest) + } + + void purgeBucketFiles(String bucketName) { + s3PbsService.listObjectsV2(ListObjectsV2Request.builder().bucket(bucketName).build()).contents().each { files -> + s3PbsService.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(files.key()).build()) + } + } + + PutObjectResponse uploadAccount(String bucketName, AccountConfig account, String fileName = account.id) { + uploadFile(bucketName, encode(account), "${DEFAULT_ACCOUNT_DIR}/${fileName}.json") + } + + PutObjectResponse uploadStoredRequest(String bucketName, StoredRequest storedRequest, String fileName = storedRequest.requestId) { + uploadFile(bucketName, encode(storedRequest.requestData), "${DEFAULT_REQUEST_DIR}/${fileName}.json") + } + + PutObjectResponse uploadStoredResponse(String bucketName, StoredResponse storedRequest, String fileName = storedRequest.responseId) { + uploadFile(bucketName, encode(storedRequest.storedAuctionResponse), "${DEFAULT_RESPONSE_DIR}/${fileName}.json") + } + + PutObjectResponse uploadStoredImp(String bucketName, StoredImp storedImp, String fileName = storedImp.impId) { + uploadFile(bucketName, encode(storedImp.impData), "${DEFAULT_IMPS_DIR}/${fileName}.json") + } + + PutObjectResponse uploadFile(String bucketName, String fileBody, String path) { + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(bucketName) + .key(path) + .build() + s3PbsService.putObject(putObjectRequest, RequestBody.fromString(fileBody)) + } +} diff --git a/src/test/groovy/org/prebid/server/functional/testcontainers/Dependencies.groovy b/src/test/groovy/org/prebid/server/functional/testcontainers/Dependencies.groovy index 53cbecf2289..ef2575ea3ed 100644 --- a/src/test/groovy/org/prebid/server/functional/testcontainers/Dependencies.groovy +++ b/src/test/groovy/org/prebid/server/functional/testcontainers/Dependencies.groovy @@ -4,8 +4,10 @@ import org.prebid.server.functional.testcontainers.container.NetworkServiceConta import org.prebid.server.functional.util.SystemProperties import org.testcontainers.containers.MySQLContainer import org.testcontainers.containers.Network +import org.testcontainers.containers.localstack.LocalStackContainer import org.testcontainers.containers.PostgreSQLContainer import org.testcontainers.lifecycle.Startables +import org.testcontainers.utility.DockerImageName import static org.prebid.server.functional.util.SystemProperties.MOCKSERVER_VERSION @@ -34,16 +36,20 @@ class Dependencies { static final NetworkServiceContainer networkServiceContainer = new NetworkServiceContainer(MOCKSERVER_VERSION) .withNetwork(network) + static final LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse("localstack/localstack:s3-latest")) + .withNetwork(Dependencies.network) + .withServices(LocalStackContainer.Service.S3) + static void start() { if (IS_LAUNCH_CONTAINERS) { - Startables.deepStart([networkServiceContainer, mysqlContainer]) + Startables.deepStart([networkServiceContainer, mysqlContainer, localStackContainer]) .join() } } static void stop() { if (IS_LAUNCH_CONTAINERS) { - [networkServiceContainer, mysqlContainer].parallelStream() + [networkServiceContainer, mysqlContainer, localStackContainer].parallelStream() .forEach({ it.stop() }) } } diff --git a/src/test/groovy/org/prebid/server/functional/tests/storage/AccountS3Spec.groovy b/src/test/groovy/org/prebid/server/functional/tests/storage/AccountS3Spec.groovy new file mode 100644 index 00000000000..3a87be7b9e7 --- /dev/null +++ b/src/test/groovy/org/prebid/server/functional/tests/storage/AccountS3Spec.groovy @@ -0,0 +1,118 @@ +package org.prebid.server.functional.tests.storage + +import org.prebid.server.functional.model.AccountStatus +import org.prebid.server.functional.model.config.AccountConfig +import org.prebid.server.functional.model.request.auction.BidRequest +import org.prebid.server.functional.service.PrebidServerException +import org.prebid.server.functional.service.PrebidServerService +import org.prebid.server.functional.service.S3Service +import org.prebid.server.functional.testcontainers.PbsServiceFactory +import org.prebid.server.functional.util.PBSUtils + +import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED + +class AccountS3Spec extends StorageBaseSpec { + + protected PrebidServerService s3StorageAccountPbsService = PbsServiceFactory.getService(s3StorageConfig + + mySqlDisabledConfig + + ['settings.enforce-valid-account': 'true']) + + def "PBS should process request when active account is present in S3 storage"() { + given: "Default BidRequest with account" + def accountId = PBSUtils.randomNumber as String + def bidRequest = BidRequest.defaultBidRequest.tap { + setAccountId(accountId) + } + + and: "Active account config" + def account = new AccountConfig(id: accountId, status: AccountStatus.ACTIVE) + + and: "Saved account in AWS S3 storage" + s3Service.uploadAccount(DEFAULT_BUCKET, account) + + when: "PBS processes auction request" + def response = s3StorageAccountPbsService.sendAuctionRequest(bidRequest) + + then: "Response should contain seatbid" + assert response.seatbid.size() == 1 + } + + def "PBS should throw exception when inactive account is present in S3 storage"() { + given: "Default BidRequest with account" + def accountId = PBSUtils.randomNumber as String + def bidRequest = BidRequest.defaultBidRequest.tap { + setAccountId(accountId) + } + + and: "Inactive account config" + def account = new AccountConfig(id: accountId, status: AccountStatus.INACTIVE) + + and: "Saved account in AWS S3 storage" + s3Service.uploadAccount(DEFAULT_BUCKET, account) + + when: "PBS processes auction request" + s3StorageAccountPbsService.sendAuctionRequest(bidRequest) + + then: "PBS should reject the entire auction" + def exception = thrown(PrebidServerException) + assert exception.statusCode == UNAUTHORIZED.code() + assert exception.responseBody == "Account $accountId is inactive" + } + + def "PBS should throw exception when account id isn't match with bid request account id"() { + given: "Default BidRequest with account" + def accountId = PBSUtils.randomNumber as String + def bidRequest = BidRequest.defaultBidRequest.tap { + setAccountId(accountId) + } + + and: "Account config with different accountId" + def account = new AccountConfig(id: PBSUtils.randomString, status: AccountStatus.ACTIVE) + + and: "Saved account in AWS S3 storage" + s3Service.uploadAccount(DEFAULT_BUCKET, account, accountId) + + when: "PBS processes auction request" + s3StorageAccountPbsService.sendAuctionRequest(bidRequest) + + then: "PBS should reject the entire auction" + def exception = thrown(PrebidServerException) + assert exception.statusCode == UNAUTHORIZED.code() + assert exception.responseBody == "Unauthorized account id: ${accountId}" + } + + def "PBS should throw exception when account is invalid in S3 storage json file"() { + given: "Default BidRequest" + def accountId = PBSUtils.randomNumber as String + def bidRequest = BidRequest.defaultBidRequest.tap { + setAccountId(accountId) + } + + and: "Saved invalid account in AWS S3 storage" + s3Service.uploadFile(DEFAULT_BUCKET, INVALID_FILE_BODY, "${S3Service.DEFAULT_ACCOUNT_DIR}/${accountId}.json") + + when: "PBS processes auction request" + s3StorageAccountPbsService.sendAuctionRequest(bidRequest) + + then: "PBS should reject the entire auction" + def exception = thrown(PrebidServerException) + assert exception.statusCode == UNAUTHORIZED.code() + assert exception.responseBody == "Unauthorized account id: ${accountId}" + } + + def "PBS should throw exception when account is not present in S3 storage and valid account enforced"() { + given: "Default BidRequest" + def accountId = PBSUtils.randomNumber as String + def bidRequest = BidRequest.defaultBidRequest.tap { + setAccountId(accountId) + } + + when: "PBS processes auction request" + s3StorageAccountPbsService.sendAuctionRequest(bidRequest) + + then: "PBS should reject the entire auction" + def exception = thrown(PrebidServerException) + assert exception.statusCode == UNAUTHORIZED.code() + assert exception.responseBody == "Unauthorized account id: ${accountId}" + } +} diff --git a/src/test/groovy/org/prebid/server/functional/tests/storage/AmpS3Spec.groovy b/src/test/groovy/org/prebid/server/functional/tests/storage/AmpS3Spec.groovy new file mode 100644 index 00000000000..e6dda6b407c --- /dev/null +++ b/src/test/groovy/org/prebid/server/functional/tests/storage/AmpS3Spec.groovy @@ -0,0 +1,115 @@ +package org.prebid.server.functional.tests.storage + +import org.prebid.server.functional.model.db.StoredRequest +import org.prebid.server.functional.model.request.amp.AmpRequest +import org.prebid.server.functional.model.request.auction.BidRequest +import org.prebid.server.functional.model.request.auction.Site +import org.prebid.server.functional.service.PrebidServerException +import org.prebid.server.functional.service.S3Service +import org.prebid.server.functional.util.PBSUtils +import spock.lang.PendingFeature + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST + +class AmpS3Spec extends StorageBaseSpec { + + def "PBS should take parameters from the stored request on S3 service when it's not specified in the request"() { + given: "AMP request" + def ampRequest = new AmpRequest(tagId: PBSUtils.randomString).tap { + account = PBSUtils.randomNumber as String + } + + and: "Default stored request" + def ampStoredRequest = BidRequest.defaultStoredRequest.tap { + site = Site.defaultSite + setAccountId(ampRequest.account) + } + + and: "Stored request in S3 service" + def storedRequest = StoredRequest.getStoredRequest(ampRequest, ampStoredRequest) + s3Service.uploadStoredRequest(DEFAULT_BUCKET, storedRequest) + + when: "PBS processes amp request" + s3StoragePbsService.sendAmpRequest(ampRequest) + + then: "Bidder request should contain parameters from the stored request" + def bidderRequest = bidder.getBidderRequest(ampStoredRequest.id) + + assert bidderRequest.site?.page == ampStoredRequest.site.page + assert bidderRequest.site?.publisher?.id == ampStoredRequest.site.publisher.id + assert !bidderRequest.imp[0]?.tagId + assert bidderRequest.imp[0]?.banner?.format[0]?.height == ampStoredRequest.imp[0].banner.format[0].height + assert bidderRequest.imp[0]?.banner?.format[0]?.weight == ampStoredRequest.imp[0].banner.format[0].weight + assert bidderRequest.regs?.gdpr == ampStoredRequest.regs.gdpr + } + + @PendingFeature + def "PBS should throw exception when trying to take parameters from the stored request on S3 service with invalid id in file"() { + given: "AMP request" + def ampRequest = new AmpRequest(tagId: PBSUtils.randomString).tap { + account = PBSUtils.randomNumber as String + } + + and: "Default stored request" + def ampStoredRequest = BidRequest.defaultStoredRequest.tap { + site = Site.defaultSite + setAccountId(ampRequest.account) + } + + and: "Stored request in S3 service" + def storedRequest = StoredRequest.getStoredRequest(ampRequest, ampStoredRequest).tap { + it.requestId = PBSUtils.randomNumber + } + s3Service.uploadStoredRequest(DEFAULT_BUCKET, storedRequest, ampRequest.tagId) + + when: "PBS processes amp request" + s3StoragePbsService.sendAmpRequest(ampRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Stored request processing failed: " + + "No stored request found for id: ${ampRequest.tagId}" + } + + def "PBS should throw exception when trying to take parameters from request where id isn't match with stored request id"() { + given: "AMP request" + def ampRequest = new AmpRequest(tagId: PBSUtils.randomString).tap { + account = PBSUtils.randomNumber as String + } + + and: "Default stored request" + def ampStoredRequest = BidRequest.defaultStoredRequest.tap { + site = Site.defaultSite + setAccountId(ampRequest.account) + } + + and: "Stored request in S3 service" + s3Service.uploadFile(DEFAULT_BUCKET, INVALID_FILE_BODY, "${S3Service.DEFAULT_REQUEST_DIR}/${ampRequest.tagId}.json") + + when: "PBS processes amp request" + s3StoragePbsService.sendAmpRequest(ampRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Stored request processing failed: " + + "Can't parse Json for stored request with id ${ampRequest.tagId}" + } + + def "PBS should throw an exception when trying to take parameters from stored request on S3 service that do not exist"() { + given: "AMP request" + def ampRequest = new AmpRequest(tagId: PBSUtils.randomString).tap { + account = PBSUtils.randomNumber as String + } + + when: "PBS processes amp request" + s3StoragePbsService.sendAmpRequest(ampRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Stored request processing failed: " + + "No stored request found for id: ${ampRequest.tagId}" + } +} diff --git a/src/test/groovy/org/prebid/server/functional/tests/storage/AuctionS3Spec.groovy b/src/test/groovy/org/prebid/server/functional/tests/storage/AuctionS3Spec.groovy new file mode 100644 index 00000000000..51d39dd5af9 --- /dev/null +++ b/src/test/groovy/org/prebid/server/functional/tests/storage/AuctionS3Spec.groovy @@ -0,0 +1,117 @@ +package org.prebid.server.functional.tests.storage + +import org.prebid.server.functional.model.db.StoredImp +import org.prebid.server.functional.model.request.auction.BidRequest +import org.prebid.server.functional.model.request.auction.Imp +import org.prebid.server.functional.model.request.auction.PrebidStoredRequest +import org.prebid.server.functional.model.request.auction.SecurityLevel +import org.prebid.server.functional.service.PrebidServerException +import org.prebid.server.functional.service.S3Service +import org.prebid.server.functional.util.PBSUtils +import spock.lang.PendingFeature + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST + +class AuctionS3Spec extends StorageBaseSpec { + + def "PBS auction should populate imp[0].secure depend which value in imp stored request from S3 service"() { + given: "Default bid request" + def storedRequestId = PBSUtils.randomString + def bidRequest = BidRequest.defaultBidRequest.tap { + imp[0].tap { + it.ext.prebid.storedRequest = new PrebidStoredRequest(id: storedRequestId) + it.secure = null + } + } + + and: "Save storedImp into S3 service" + def secureStoredRequest = PBSUtils.getRandomEnum(SecurityLevel.class) + def storedImp = StoredImp.getStoredImp(bidRequest).tap { + impData = Imp.defaultImpression.tap { + secure = secureStoredRequest + } + } + s3Service.uploadStoredImp(DEFAULT_BUCKET, storedImp) + + when: "Requesting PBS auction" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "Response should contain imp[0].secure same value as in request" + def bidderRequest = bidder.getBidderRequest(bidRequest.id) + assert bidderRequest.imp[0].secure == secureStoredRequest + } + + @PendingFeature + def "PBS should throw exception when trying to populate imp[0].secure from imp stored request on S3 service with impId that doesn't matches"() { + given: "Default bid request" + def storedRequestId = PBSUtils.randomString + def bidRequest = BidRequest.defaultBidRequest.tap { + imp[0].tap { + it.ext.prebid.storedRequest = new PrebidStoredRequest(id: storedRequestId) + it.secure = null + } + } + + and: "Save storedImp with different impId into S3 service" + def secureStoredRequest = PBSUtils.getRandomNumber(0, 1) + def storedImp = StoredImp.getStoredImp(bidRequest).tap { + impId = PBSUtils.randomString + impData = Imp.defaultImpression.tap { + it.secure = secureStoredRequest + } + } + s3Service.uploadStoredImp(DEFAULT_BUCKET, storedImp, storedRequestId) + + when: "Requesting PBS auction" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Stored request processing failed: " + + "No stored impression found for id: ${storedRequestId}" + } + + def "PBS should throw exception when trying to populate imp[0].secure from invalid imp stored request on S3 service"() { + given: "Default bid request" + def storedRequestId = PBSUtils.randomString + def bidRequest = BidRequest.defaultBidRequest.tap { + imp[0].tap { + it.ext.prebid.storedRequest = new PrebidStoredRequest(id: storedRequestId) + it.secure = null + } + } + + and: "Save storedImp into S3 service" + s3Service.uploadFile(DEFAULT_BUCKET, INVALID_FILE_BODY, "${S3Service.DEFAULT_IMPS_DIR}/${storedRequestId}.json" ) + + when: "Requesting PBS auction" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Stored request processing failed: " + + "Can't parse Json for stored request with id ${storedRequestId}" + } + + def "PBS should throw exception when trying to populate imp[0].secure from unexciting imp stored request on S3 service"() { + given: "Default bid request" + def storedRequestId = PBSUtils.randomString + def bidRequest = BidRequest.defaultBidRequest.tap { + imp[0].tap { + it.ext.prebid.storedRequest = new PrebidStoredRequest(id: storedRequestId) + it.secure = null + } + } + + when: "Requesting PBS auction" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Stored request processing failed: " + + "No stored impression found for id: ${storedRequestId}" + } +} diff --git a/src/test/groovy/org/prebid/server/functional/tests/storage/StorageBaseSpec.groovy b/src/test/groovy/org/prebid/server/functional/tests/storage/StorageBaseSpec.groovy new file mode 100644 index 00000000000..583d6d97e06 --- /dev/null +++ b/src/test/groovy/org/prebid/server/functional/tests/storage/StorageBaseSpec.groovy @@ -0,0 +1,56 @@ +package org.prebid.server.functional.tests.storage + +import org.prebid.server.functional.service.PrebidServerService +import org.prebid.server.functional.service.S3Service +import org.prebid.server.functional.testcontainers.Dependencies +import org.prebid.server.functional.testcontainers.PbsServiceFactory +import org.prebid.server.functional.tests.BaseSpec +import org.prebid.server.functional.util.PBSUtils + +class StorageBaseSpec extends BaseSpec { + + protected static final String INVALID_FILE_BODY = 'INVALID' + protected static final String DEFAULT_BUCKET = PBSUtils.randomString.toLowerCase() + + protected static final S3Service s3Service = new S3Service(Dependencies.localStackContainer) + + def setupSpec() { + s3Service.createBucket(DEFAULT_BUCKET) + } + + def cleanupSpec() { + s3Service.purgeBucketFiles(DEFAULT_BUCKET) + s3Service.deleteBucket(DEFAULT_BUCKET) + } + + protected static Map s3StorageConfig = [ + 'settings.s3.accessKeyId' : s3Service.accessKeyId, + 'settings.s3.secretAccessKey' : s3Service.secretKeyId, + 'settings.s3.endpoint' : s3Service.endpoint, + 'settings.s3.bucket' : DEFAULT_BUCKET, + 'settings.s3.region' : s3Service.region, + 'settings.s3.force-path-style' : 'true', + 'settings.s3.accounts-dir' : S3Service.DEFAULT_ACCOUNT_DIR, + 'settings.s3.stored-imps-dir' : S3Service.DEFAULT_IMPS_DIR, + 'settings.s3.stored-requests-dir' : S3Service.DEFAULT_REQUEST_DIR, + 'settings.s3.stored-responses-dir': S3Service.DEFAULT_RESPONSE_DIR, + ] + + protected static Map mySqlDisabledConfig = + ['settings.database.type' : null, + 'settings.database.host' : null, + 'settings.database.port' : null, + 'settings.database.dbname' : null, + 'settings.database.user' : null, + 'settings.database.password' : null, + 'settings.database.pool-size' : null, + 'settings.database.provider-class' : null, + 'settings.database.account-query' : null, + 'settings.database.stored-requests-query' : null, + 'settings.database.amp-stored-requests-query': null, + 'settings.database.stored-responses-query' : null + ].asImmutable() as Map + + + protected PrebidServerService s3StoragePbsService = PbsServiceFactory.getService(s3StorageConfig + mySqlDisabledConfig) +} diff --git a/src/test/groovy/org/prebid/server/functional/tests/storage/StoredResponseS3Spec.groovy b/src/test/groovy/org/prebid/server/functional/tests/storage/StoredResponseS3Spec.groovy new file mode 100644 index 00000000000..e07b5b71f2e --- /dev/null +++ b/src/test/groovy/org/prebid/server/functional/tests/storage/StoredResponseS3Spec.groovy @@ -0,0 +1,99 @@ +package org.prebid.server.functional.tests.storage + +import org.prebid.server.functional.model.db.StoredResponse +import org.prebid.server.functional.model.request.auction.BidRequest +import org.prebid.server.functional.model.request.auction.StoredAuctionResponse +import org.prebid.server.functional.model.response.auction.SeatBid +import org.prebid.server.functional.service.PrebidServerException +import org.prebid.server.functional.service.S3Service +import org.prebid.server.functional.util.PBSUtils +import spock.lang.PendingFeature + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST + +class StoredResponseS3Spec extends StorageBaseSpec { + + def "PBS should return info from S3 stored auction response when it defined in request"() { + given: "Default basic BidRequest with stored response" + def bidRequest = BidRequest.defaultBidRequest + def storedResponseId = PBSUtils.randomNumber + bidRequest.imp[0].ext.prebid.storedAuctionResponse = new StoredAuctionResponse(id: storedResponseId) + + and: "Stored auction response in S3 storage" + def storedAuctionResponse = SeatBid.getStoredResponse(bidRequest) + def storedResponse = new StoredResponse(responseId: storedResponseId, + storedAuctionResponse: storedAuctionResponse) + s3Service.uploadStoredResponse(DEFAULT_BUCKET, storedResponse) + + when: "PBS processes auction request" + def response = s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "Response should contain information from stored auction response" + assert response.id == bidRequest.id + assert response.seatbid[0]?.seat == storedAuctionResponse.seat + assert response.seatbid[0]?.bid?.size() == storedAuctionResponse.bid.size() + assert response.seatbid[0]?.bid[0]?.impid == storedAuctionResponse.bid[0].impid + assert response.seatbid[0]?.bid[0]?.price == storedAuctionResponse.bid[0].price + assert response.seatbid[0]?.bid[0]?.id == storedAuctionResponse.bid[0].id + + and: "PBS not send request to bidder" + assert !bidder.getRequestCount(bidRequest.id) + } + + @PendingFeature + def "PBS should throw request format exception when stored auction response id isn't match with requested response id"() { + given: "Default basic BidRequest with stored response" + def bidRequest = BidRequest.defaultBidRequest + def storedResponseId = PBSUtils.randomNumber + bidRequest.imp[0].ext.prebid.storedAuctionResponse = new StoredAuctionResponse(id: storedResponseId) + + and: "Stored auction response in S3 storage with different id" + def storedAuctionResponse = SeatBid.getStoredResponse(bidRequest) + def storedResponse = new StoredResponse(responseId: PBSUtils.randomNumber, + storedAuctionResponse: storedAuctionResponse) + s3Service.uploadStoredResponse(DEFAULT_BUCKET, storedResponse, storedResponseId as String) + + when: "PBS processes auction request" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Failed to fetch stored auction response for " + + "impId = ${bidRequest.imp[0].id} and storedAuctionResponse id = ${storedResponseId}." + } + + def "PBS should throw request format exception when invalid stored auction response defined in S3 storage"() { + given: "Default basic BidRequest with stored response" + def bidRequest = BidRequest.defaultBidRequest + def storedResponseId = PBSUtils.randomNumber + bidRequest.imp[0].ext.prebid.storedAuctionResponse = new StoredAuctionResponse(id: storedResponseId) + + and: "Invalid stored auction response in S3 storage" + s3Service.uploadFile(DEFAULT_BUCKET, INVALID_FILE_BODY, "${S3Service.DEFAULT_RESPONSE_DIR}/${storedResponseId}.json") + + when: "PBS processes auction request" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Can't parse Json for stored response with id ${storedResponseId}" + } + + def "PBS should throw request format exception when stored auction response defined in request but not defined in S3 storage"() { + given: "Default basic BidRequest with stored response" + def bidRequest = BidRequest.defaultBidRequest + def storedResponseId = PBSUtils.randomNumber + bidRequest.imp[0].ext.prebid.storedAuctionResponse = new StoredAuctionResponse(id: storedResponseId) + + when: "PBS processes auction request" + s3StoragePbsService.sendAuctionRequest(bidRequest) + + then: "PBS should throw request format error" + def exception = thrown(PrebidServerException) + assert exception.statusCode == BAD_REQUEST.code() + assert exception.responseBody == "Invalid request format: Failed to fetch stored auction response for " + + "impId = ${bidRequest.imp[0].id} and storedAuctionResponse id = ${storedResponseId}." + } +} diff --git a/src/test/java/org/prebid/server/auction/BidResponseCreatorTest.java b/src/test/java/org/prebid/server/auction/BidResponseCreatorTest.java index 13944033ed1..674a3fcf245 100644 --- a/src/test/java/org/prebid/server/auction/BidResponseCreatorTest.java +++ b/src/test/java/org/prebid/server/auction/BidResponseCreatorTest.java @@ -1223,7 +1223,6 @@ public void shouldReturnEmptyAssetIfNoRelatedNativeAssetFound() throws JsonProce final BidResponse bidResponse = target.create(auctionContext, CACHE_INFO, MULTI_BIDS).result(); // then - assertThat(bidResponse.getSeatbid()).hasSize(1) .flatExtracting(SeatBid::getBid) .extracting(Bid::getAdm) @@ -1283,7 +1282,6 @@ public void shouldReturnEmptyAssetIfIdIsNotPresentRelatedNativeAssetFound() thro final BidResponse bidResponse = target.create(auctionContext, CACHE_INFO, MULTI_BIDS).result(); // then - assertThat(bidResponse.getSeatbid()).hasSize(1) .flatExtracting(SeatBid::getBid) .extracting(Bid::getAdm) diff --git a/src/test/java/org/prebid/server/bidder/algorix/AlgorixBidderTest.java b/src/test/java/org/prebid/server/bidder/algorix/AlgorixBidderTest.java index 52495c635da..ab68ca9a517 100644 --- a/src/test/java/org/prebid/server/bidder/algorix/AlgorixBidderTest.java +++ b/src/test/java/org/prebid/server/bidder/algorix/AlgorixBidderTest.java @@ -67,7 +67,6 @@ public void makeHttpRequestsShouldReturnErrorIfImpExtCouldNotBeParsed() { @Test public void makeHttpRequestsShouldReturnErrorOfEveryNotValidImp() { // given - final BidRequest bidRequest = BidRequest.builder() .imp(asList(Imp.builder() .id("123") diff --git a/src/test/java/org/prebid/server/bidder/mobilefuse/MobilefuseBidderTest.java b/src/test/java/org/prebid/server/bidder/mobilefuse/MobilefuseBidderTest.java index 9ad08ecb30a..6de8d3f4d65 100644 --- a/src/test/java/org/prebid/server/bidder/mobilefuse/MobilefuseBidderTest.java +++ b/src/test/java/org/prebid/server/bidder/mobilefuse/MobilefuseBidderTest.java @@ -221,7 +221,6 @@ public void makeHttpRequestsShouldModifyImpWithAddingSkadnWhenSkadnIsPresent() { final Result>> result = target.makeHttpRequests(bidRequest); // then - final ObjectNode expectedImpExt = mapper.createObjectNode().set("skadn", skadn); assertThat(result.getErrors()).isEmpty(); assertThat(result.getValue()).hasSize(1) diff --git a/src/test/java/org/prebid/server/bidder/rtbhouse/RtbhouseBidderTest.java b/src/test/java/org/prebid/server/bidder/rtbhouse/RtbhouseBidderTest.java index a31a943c0e2..720f970bff6 100644 --- a/src/test/java/org/prebid/server/bidder/rtbhouse/RtbhouseBidderTest.java +++ b/src/test/java/org/prebid/server/bidder/rtbhouse/RtbhouseBidderTest.java @@ -184,7 +184,6 @@ public void makeHttpRequestsShouldConvertCurrencyIfRequestCurrencyDoesNotMatchBi @Test public void makeHttpRequestsShouldTakePriceFloorsWhenBidfloorParamIsAlsoPresent() { // given - final BidRequest bidRequest = BidRequest.builder() .imp(singletonList(Imp.builder() .bidfloor(BigDecimal.TEN).bidfloorcur("USD") @@ -209,7 +208,6 @@ public void makeHttpRequestsShouldTakePriceFloorsWhenBidfloorParamIsAlsoPresent( @Test public void makeHttpRequestsShouldTakeBidfloorExtImpParamIfNoBidfloorInRequest() { // given - final BidRequest bidRequest = BidRequest.builder() .imp(singletonList(Imp.builder() .ext(mapper.valueToTree(ExtPrebid.of(null, diff --git a/src/test/java/org/prebid/server/bidder/smaato/SmaatoBidderTest.java b/src/test/java/org/prebid/server/bidder/smaato/SmaatoBidderTest.java index 24531719f48..18b1ba36e8a 100644 --- a/src/test/java/org/prebid/server/bidder/smaato/SmaatoBidderTest.java +++ b/src/test/java/org/prebid/server/bidder/smaato/SmaatoBidderTest.java @@ -921,7 +921,6 @@ public void makeBidsShouldReturnCorrectBidIfAdMarkTypeIsNative() throws JsonProc final Result> result = target.makeBids(httpCall, null); // then - final String expectedAdm = "{\"assets\":[{\"id\":1,\"img\":{\"type\":3," + "\"url\":\"https://smaato.com/image.png\",\"w\":480,\"h\":320}}]," + "\"link\":{\"url\":\"https://www.smaato.com\"}}"; diff --git a/src/test/java/org/prebid/server/cache/CoreCacheServiceTest.java b/src/test/java/org/prebid/server/cache/CoreCacheServiceTest.java index 8c35236c361..8a55773a0c0 100644 --- a/src/test/java/org/prebid/server/cache/CoreCacheServiceTest.java +++ b/src/test/java/org/prebid/server/cache/CoreCacheServiceTest.java @@ -646,7 +646,6 @@ public void cacheBidsOpenrtbShouldNotUpdateVastXmlPutObjectWithKeyWhenDoesNotHav @Test public void cacheBidsOpenrtbShouldRemoveCatDurPrefixFromVideoUuidFromResponse() throws IOException { // given - givenHttpClientReturnsResponse(200, mapper.writeValueAsString( BidCacheResponse.of(asList(CacheObject.of("uuid"), CacheObject.of("catDir_randomId"))))); final BidInfo bidInfo1 = givenBidInfo(builder -> builder.id("bid1").impid("impId1").adm("adm"), diff --git a/src/test/java/org/prebid/server/floors/BasicPriceFloorProcessorTest.java b/src/test/java/org/prebid/server/floors/BasicPriceFloorProcessorTest.java index 70835b4286a..0990d97b5aa 100644 --- a/src/test/java/org/prebid/server/floors/BasicPriceFloorProcessorTest.java +++ b/src/test/java/org/prebid/server/floors/BasicPriceFloorProcessorTest.java @@ -493,7 +493,6 @@ public void shouldNotUpdateImpsIfBidFloorNotResolved() { @Test public void shouldUpdateImpsIfBidFloorResolved() { // given - final PriceFloorRules requestFloors = givenFloors(floors -> floors .data(givenFloorData(floorData -> floorData .modelGroups(singletonList(givenModelGroup(identity())))))); @@ -511,7 +510,6 @@ public void shouldUpdateImpsIfBidFloorResolved() { .willReturn(PriceFloorResult.of("rule", BigDecimal.ONE, BigDecimal.TEN, "USD")); // when - final BidRequest result = target.enrichWithPriceFloors( givenBidRequest(request -> request.imp(imps), requestFloors), givenAccount(identity()), diff --git a/src/test/java/org/prebid/server/handler/openrtb2/AmpHandlerTest.java b/src/test/java/org/prebid/server/handler/openrtb2/AmpHandlerTest.java index a5af550a562..9f40ed1bc81 100644 --- a/src/test/java/org/prebid/server/handler/openrtb2/AmpHandlerTest.java +++ b/src/test/java/org/prebid/server/handler/openrtb2/AmpHandlerTest.java @@ -726,7 +726,6 @@ public void shouldIncrementErrAmpRequestMetrics() { @Test public void shouldUpdateRequestTimeMetric() { // given - // set up clock mock to check that request_time metric has been updated with expected value given(clock.millis()).willReturn(5000L).willReturn(5500L); diff --git a/src/test/java/org/prebid/server/handler/openrtb2/AuctionHandlerTest.java b/src/test/java/org/prebid/server/handler/openrtb2/AuctionHandlerTest.java index 87396cca90d..c2a84bcc922 100644 --- a/src/test/java/org/prebid/server/handler/openrtb2/AuctionHandlerTest.java +++ b/src/test/java/org/prebid/server/handler/openrtb2/AuctionHandlerTest.java @@ -587,7 +587,6 @@ public void shouldIncrementErrOpenrtb2WebRequestMetrics() { @Test public void shouldUpdateRequestTimeMetric() { // given - // set up clock mock to check that request_time metric has been updated with expected value given(clock.millis()).willReturn(5000L).willReturn(5500L); diff --git a/src/test/java/org/prebid/server/it/AdnuntiusTest.java b/src/test/java/org/prebid/server/it/AdnuntiusTest.java index 23bb6000371..9a03d73ccf5 100644 --- a/src/test/java/org/prebid/server/it/AdnuntiusTest.java +++ b/src/test/java/org/prebid/server/it/AdnuntiusTest.java @@ -18,7 +18,6 @@ public class AdnuntiusTest extends IntegrationTest { @Test public void openrtb2AuctionShouldRespondWithBidsFromAdnuntius() throws IOException, JSONException { // given - WIRE_MOCK_RULE.stubFor(post(urlPathEqualTo("/adnuntius-exchange")) .withRequestBody(equalToJson(jsonFrom("openrtb2/adnuntius/test-adnuntius-bid-request.json"))) .willReturn(aResponse().withBody(jsonFrom("openrtb2/adnuntius/test-adnuntius-bid-response.json")))); diff --git a/src/test/java/org/prebid/server/it/MinuteMediaTest.java b/src/test/java/org/prebid/server/it/MinuteMediaTest.java index 0aace384eb5..2f8c591dc08 100644 --- a/src/test/java/org/prebid/server/it/MinuteMediaTest.java +++ b/src/test/java/org/prebid/server/it/MinuteMediaTest.java @@ -19,7 +19,6 @@ public class MinuteMediaTest extends IntegrationTest { @Test public void openrtb2AuctionShouldRespondWithBidsFromMinuteMedia() throws IOException, JSONException { // given - WIRE_MOCK_RULE.stubFor(post(urlPathEqualTo("/minutemedia-exchange")) .withQueryParam("publisherId", equalTo("123")) .withRequestBody(equalToJson(jsonFrom("openrtb2/minutemedia/test-minutemedia-bid-request.json"))) diff --git a/src/test/java/org/prebid/server/it/PrecisoTest.java b/src/test/java/org/prebid/server/it/PrecisoTest.java index 5d97978e7b7..da461016e64 100644 --- a/src/test/java/org/prebid/server/it/PrecisoTest.java +++ b/src/test/java/org/prebid/server/it/PrecisoTest.java @@ -23,8 +23,8 @@ public void openrtb2AuctionShouldRespondWithBidsFromPreciso() throws IOException "openrtb2/preciso/test-preciso-bid-request.json"))) .willReturn(aResponse().withBody(jsonFrom( "openrtb2/preciso/test-preciso-bid-response.json")))); - // when + // when final Response response = responseFor("openrtb2/preciso/test-auction-preciso-request.json", Endpoint.openrtb2_auction); diff --git a/src/test/java/org/prebid/server/settings/CachingApplicationSettingsTest.java b/src/test/java/org/prebid/server/settings/CachingApplicationSettingsTest.java index bac345c8906..d09df3327a8 100644 --- a/src/test/java/org/prebid/server/settings/CachingApplicationSettingsTest.java +++ b/src/test/java/org/prebid/server/settings/CachingApplicationSettingsTest.java @@ -336,7 +336,6 @@ public void getCategoriesShouldNotCacheNotPreBidException() { .willReturn(Future.failedFuture(new TimeoutException("timeout"))); // when - target.getCategories("adServer", "publisher", timeout); target.getCategories("adServer", "publisher", timeout); final Future> lastFuture = diff --git a/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java b/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java new file mode 100644 index 00000000000..2f7c293f9f8 --- /dev/null +++ b/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java @@ -0,0 +1,403 @@ +package org.prebid.server.settings; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.prebid.server.VertxTest; +import org.prebid.server.exception.PreBidException; +import org.prebid.server.execution.Timeout; +import org.prebid.server.settings.model.Account; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.settings.model.StoredResponseDataResult; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; + +@ExtendWith(MockitoExtension.class) +@ExtendWith(VertxExtension.class) +public class S3ApplicationSettingsTest extends VertxTest { + + private static final String BUCKET = "bucket"; + private static final String ACCOUNTS_DIR = "accounts"; + private static final String STORED_IMPS_DIR = "stored-imps"; + private static final String STORED_REQUESTS_DIR = "stored-requests"; + private static final String STORED_RESPONSES_DIR = "stored-responses"; + + @Mock + private S3AsyncClient s3AsyncClient; + + private Vertx vertx; + + private S3ApplicationSettings target; + + @Mock + private Timeout timeout; + + @BeforeEach + public void setUp() { + vertx = Vertx.vertx(); + target = new S3ApplicationSettings( + s3AsyncClient, + BUCKET, + ACCOUNTS_DIR, + STORED_IMPS_DIR, + STORED_REQUESTS_DIR, + STORED_RESPONSES_DIR, + jacksonMapper, + vertx); + + given(timeout.remaining()).willReturn(500L); + } + + @AfterEach + public void tearDown(VertxTestContext context) { + vertx.close(context.succeedingThenComplete()); + } + + @Test + public void getAccountByIdShouldReturnFetchedAccount(VertxTestContext context) throws JsonProcessingException { + // given + final Account account = Account.builder().id("accountId").build(); + + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(ACCOUNTS_DIR, "accountId")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + mapper.writeValueAsString(account).getBytes()))); + + // when + final Future result = target.getAccountById("accountId", timeout); + + // then + result.onComplete(context.succeeding(returnedAccount -> { + assertThat(returnedAccount.getId()).isEqualTo("accountId"); + context.completeNow(); + })); + } + + @Test + public void getAccountByIdShouldReturnTimeout(VertxTestContext context) { + // given + given(timeout.remaining()).willReturn(-1L); + + // when + final Future result = target.getAccountById("account", timeout); + + // then + result.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(TimeoutException.class) + .hasMessage("Timeout has been exceeded"); + + context.completeNow(); + })); + } + + @Test + public void getAccountByIdShouldReturnAccountNotFound(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("error")))); + + // when + final Future result = target.getAccountById("notFoundId", timeout); + + // then + result.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Account with id notFoundId not found"); + + context.completeNow(); + })); + } + + @Test + public void getAccountByIdShouldReturnInvalidJson(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "invalidJson".getBytes()))); + + // when + final Future result = target.getAccountById("invalidJsonId", timeout); + + // then + result.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Invalid json for account with id invalidJsonId"); + + context.completeNow(); + })); + } + + @Test + public void getAccountByIdShouldReturnAccountIdMismatch(VertxTestContext context) throws JsonProcessingException { + // given + final Account account = Account.builder().id("accountId").build(); + + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(ACCOUNTS_DIR, "anotherAccountId")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + mapper.writeValueAsString(account).getBytes()))); + + // when + final Future result = target.getAccountById("anotherAccountId", timeout); + + // then + result.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Account with id anotherAccountId does not match id accountId in file"); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredRequest(VertxTestContext context) { + // given + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_REQUESTS_DIR, "request")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "storedRequest".getBytes()))); + + // when + final Future result = target.getStoredData( + "accountId", Set.of("request"), emptySet(), timeout); + + // then + result.onComplete(context.succeeding(storedDataResult -> { + assertThat(storedDataResult.getStoredIdToRequest()).isEqualTo(Map.of("request", "storedRequest")); + assertThat(storedDataResult.getStoredIdToImp()).isEmpty(); + assertThat(storedDataResult.getErrors()).isEmpty(); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpression(VertxTestContext context) { + // given + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_IMPS_DIR, "imp")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "storedImp".getBytes()))); + + // when + final Future result = target.getStoredData( + "accountId", emptySet(), Set.of("imp"), timeout); + + // then + result.onComplete(context.succeeding(storedDataResult -> { + assertThat(storedDataResult.getStoredIdToRequest()).isEmpty(); + assertThat(storedDataResult.getStoredIdToImp()).isEqualTo(Map.of("imp", "storedImp")); + assertThat(storedDataResult.getErrors()).isEmpty(); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpressionWithAdUnitPath(VertxTestContext context) { + // given + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_IMPS_DIR, "imp")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "storedImp".getBytes()))); + + // when + final Future result = target.getStoredData("accountId", emptySet(), Set.of("/imp"), timeout); + + // then + result.onComplete(context.succeeding(storedDataResult -> { + assertThat(storedDataResult.getStoredIdToRequest()).isEmpty(); + assertThat(storedDataResult.getStoredIdToImp()).isEqualTo(Map.of("/imp", "storedImp")); + assertThat(storedDataResult.getErrors()).isEmpty(); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredRequestAndStoredImpression(VertxTestContext context) { + // given + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_REQUESTS_DIR, "request")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "storedRequest".getBytes()))); + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_IMPS_DIR, "imp")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "storedImp".getBytes()))); + + // when + final Future result = target.getStoredData( + "accountId", Set.of("request"), Set.of("imp"), timeout); + + // then + result.onComplete(context.succeeding(storedDataResult -> { + assertThat(storedDataResult.getStoredIdToRequest()).isEqualTo(Map.of("request", "storedRequest")); + assertThat(storedDataResult.getStoredIdToImp()).isEqualTo(Map.of("imp", "storedImp")); + assertThat(storedDataResult.getErrors()).isEmpty(); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnErrorsForNotFoundRequests(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("error")))); + + // when + final Future result = target.getStoredData( + "accountId", Set.of("request"), emptySet(), timeout); + + // then + result.onComplete(context.succeeding(storedDataResult -> { + assertThat(storedDataResult.getStoredIdToImp()).isEmpty(); + assertThat(storedDataResult.getStoredIdToRequest()).isEmpty(); + assertThat(storedDataResult.getErrors()) + .isEqualTo(singletonList("No stored request found for id: request")); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnErrorsForNotFoundImpressions(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("error")))); + + // when + final Future result = target.getStoredData( + "accountId", emptySet(), Set.of("imp"), timeout); + + // then + result.onComplete(context.succeeding(storedDataResult -> { + assertThat(storedDataResult.getStoredIdToImp()).isEmpty(); + assertThat(storedDataResult.getStoredIdToRequest()).isEmpty(); + assertThat(storedDataResult.getErrors()).isEqualTo(singletonList("No stored impression found for id: imp")); + + context.completeNow(); + })); + } + + @Test + public void getStoredResponsesShouldReturnExpectedResult(VertxTestContext context) { + // given + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_RESPONSES_DIR, "response1")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "storedResponse1".getBytes()))); + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key("%s/%s.json".formatted(STORED_RESPONSES_DIR, "response2")) + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("error")))); + + // when + final Future result = target.getStoredResponses( + Set.of("response1", "response2"), timeout); + + // then + result.onComplete(context.succeeding(storedResponseDataResult -> { + assertThat(storedResponseDataResult.getIdToStoredResponses()) + .isEqualTo(Map.of("response1", "storedResponse1")); + assertThat(storedResponseDataResult.getErrors()) + .isEqualTo(singletonList("No stored response found for id: response2")); + + context.completeNow(); + })); + } +} diff --git a/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java b/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java new file mode 100644 index 00000000000..e9b37a75d94 --- /dev/null +++ b/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java @@ -0,0 +1,174 @@ +package org.prebid.server.settings.service; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.prebid.server.VertxTest; +import org.prebid.server.metric.MetricName; +import org.prebid.server.metric.Metrics; +import org.prebid.server.settings.CacheNotificationListener; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.time.Clock; +import java.util.concurrent.CompletableFuture; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mock.Strictness.LENIENT; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +@ExtendWith(VertxExtension.class) +public class S3PeriodicRefreshServiceTest extends VertxTest { + + private static final String BUCKET = "bucket"; + private static final String STORED_REQ_DIR = "stored-req"; + private static final String STORED_IMP_DIR = "stored-imp"; + + @Mock(strictness = LENIENT) + private S3AsyncClient s3AsyncClient; + + @Mock + private CacheNotificationListener cacheNotificationListener; + + @Mock + private Clock clock; + + @Mock + private Metrics metrics; + + private Vertx vertx; + + @BeforeEach + public void setUp() { + vertx = spy(Vertx.vertx()); + + given(s3AsyncClient.listObjects(eq(ListObjectsRequest.builder() + .bucket(BUCKET) + .prefix(STORED_REQ_DIR) + .build()))) + .willReturn(listObjectResponse(STORED_REQ_DIR + "/id1.json")); + given(s3AsyncClient.listObjects(eq(ListObjectsRequest.builder() + .bucket(BUCKET) + .prefix(STORED_IMP_DIR) + .build()))) + .willReturn(listObjectResponse(STORED_IMP_DIR + "/id2.json")); + + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key(STORED_REQ_DIR + "/id1.json") + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(getObjectResponse("value1")); + given(s3AsyncClient.getObject( + eq(GetObjectRequest.builder() + .bucket(BUCKET) + .key(STORED_IMP_DIR + "/id2.json") + .build()), + any(AsyncResponseTransformer.class))) + .willReturn(getObjectResponse("value2")); + + given(clock.millis()).willReturn(100L, 500L); + } + + @AfterEach + public void tearDown(VertxTestContext context) { + vertx.close(context.succeedingThenComplete()); + } + + @Test + public void initializeShouldCallSaveWithExpectedParameters(VertxTestContext context) { + // when and then + createAndInitService(100).onComplete(context.succeeding(ignored -> { + verify(cacheNotificationListener, atLeast(1)) + .save(singletonMap("id1", "value1"), singletonMap("id2", "value2")); + verify(metrics, atLeast(1)).updateSettingsCacheRefreshTime( + eq(MetricName.stored_request), eq(MetricName.initialize), eq(400L)); + + context.completeNow(); + })); + } + + @Test + public void initializeShouldNotCreatePeriodicTaskIfRefreshPeriodIsNegative(VertxTestContext context) { + // when and then + createAndInitService(-1).onComplete(context.succeeding(unused -> { + verify(vertx, never()).setPeriodic(anyLong(), any()); + + context.completeNow(); + })); + } + + @Test + public void initializeShouldUpdateMetricsOnError(VertxTestContext context) { + // given + given(s3AsyncClient.listObjects(any(ListObjectsRequest.class))) + .willReturn(CompletableFuture.failedFuture(new IllegalStateException("Failed"))); + + // when + createAndInitService(100).onComplete(context.failing(ignored -> { + verify(metrics, atLeast(1)).updateSettingsCacheRefreshTime( + eq(MetricName.stored_request), eq(MetricName.initialize), eq(400L)); + verify(metrics, atLeast(1)).updateSettingsCacheRefreshErrorMetric( + eq(MetricName.stored_request), eq(MetricName.initialize)); + + context.completeNow(); + })); + } + + private CompletableFuture listObjectResponse(String key) { + return CompletableFuture.completedFuture( + ListObjectsResponse + .builder() + .contents(singletonList(S3Object.builder().key(key).build())) + .build()); + } + + private CompletableFuture> getObjectResponse(String value) { + return CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + value.getBytes())); + } + + private Future createAndInitService(long refreshPeriod) { + final S3PeriodicRefreshService s3PeriodicRefreshService = new S3PeriodicRefreshService( + s3AsyncClient, + BUCKET, + STORED_REQ_DIR, + STORED_IMP_DIR, + refreshPeriod, + cacheNotificationListener, + MetricName.stored_request, + clock, + metrics, + vertx); + + final Promise init = Promise.promise(); + s3PeriodicRefreshService.initialize(init); + return init.future(); + } +}