Skip to content

Commit

Permalink
Update processes to have s3BucketIdentifier key as input
Browse files Browse the repository at this point in the history
- Add migration to update all existing processes in `processes_table` to have the mentioned input key
- Update `ProcessRunnerImpl`
	- Change `validateInput` logic to send back 400 Bad Request error in case of missing/empty keys along w/ offending keys
	- Checking for the `s3BucketIdentifier` key from input now
- Update process tests
	- Update tests for onboarding, appending and tiles metadata onboarding tests to add the extra input key `s3BucketIdentifier`
	with value `default`
  • Loading branch information
ThorodanBrom committed Feb 18, 2025
1 parent a66ef92 commit bc03412
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 80 deletions.
173 changes: 93 additions & 80 deletions src/main/java/ogc/rs/processes/ProcessesRunnerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import ogc.rs.processes.util.UtilClass;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static ogc.rs.common.Constants.processException404;
import static ogc.rs.common.Constants.processException500;
import static ogc.rs.processes.util.Constants.PROCESS_EXIST_CHECK_QUERY;
Expand Down Expand Up @@ -75,19 +77,11 @@ public class ProcessesRunnerImpl implements ProcessesRunnerService {
* @throws OgcException
*/
private S3Config getS3Config(JsonObject input) throws OgcException {
/*
* TODO: Once processes are modified to have bucket identifier, uncomment this
* out. This will check if the particular process has a particular bucket identifier key.
* Also delete the DEFAULT_BUCKET_IDENTIFIER line.
*
if(!input.containsKey(S3_BUCKET_IDENTIFIER_PROCESS_INPUT_KEY)) {
return null;
}

String s3BucketIdentifier = input.getString(S3_BUCKET_IDENTIFIER_PROCESS_INPUT_KEY);
*
*/
String s3BucketIdentifier = S3ConfigsHolder.DEFAULT_BUCKET_IDENTIFIER;

Optional<S3Config> conf = s3conf.getConfigByIdentifier(s3BucketIdentifier);

Expand Down Expand Up @@ -117,77 +111,94 @@ public ProcessesRunnerService run(JsonObject input, Handler<AsyncResult<JsonObje
boolean isAsync = processExist.getJsonArray("response")
.stream()
.anyMatch(item -> item.toString().equalsIgnoreCase("ASYNC"));
boolean validateInput = validateInput(input, processExist);
List<String> validateInput = validateInput(input, processExist);

S3Config processSpecificS3Conf = getS3Config(input);

if (validateInput) {
ProcessService processService;

// Switch case to handle different processes
switch (processName) {
case "CollectionOnboarding":
processService = new CollectionOnboardingProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
case "CollectionAppending":
processService = new CollectionAppendingProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
case "S3PreSignedURLGeneration":
processService = new S3PreSignedURLGenerationProcess(pgPool, webClient, config, processSpecificS3Conf);
break;
case "TilesMetaDataOnboarding":
processService = new TilesMetaDataOnboardingProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
case "TilesOnboardingFromExistingFeature":
processService = new TilesOnboardingFromExistingFeatureProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
default:
LOGGER.error("No method specified for process {}", processName);
handler.handle(Future.failedFuture("Process could not be executed: no method specified for process " + processName));
return;
}
S3Config processSpecificS3Conf;

try {
processSpecificS3Conf = getS3Config(input);
} catch (OgcException e) {
LOGGER.error(
"Failed to start process {}, S3 bucket identifier {} was not found in config ",
processName, input.getString(S3_BUCKET_IDENTIFIER_PROCESS_INPUT_KEY));
handler.handle(Future.failedFuture(e));
return;
}

if (!validateInput.isEmpty()) {
LOGGER.error(
"Input validation failed for process {}, following keys were missing or empty {}",
processName, validateInput);

handler.handle(Future.failedFuture(new OgcException(400,
"Failed to start process : input validation failed",
"Following keys in process input were missing or empty " + validateInput.toString())));

return;
}

ProcessService finalProcessService = processService;
Future<JsonObject> startAJobInDB = utilClass.startAJobInDB(input);

startAJobInDB.onSuccess(jobStarted -> {
if (isAsync) {
// Handle async process
LOGGER.info("Async Job started in DB with jobId {} for process with processId {}",
jobStarted.getValue("jobId"), input.getString("processId"));
handler.handle(Future.succeededFuture(
new JsonObject()
.put("jobId", jobStarted.getValue("jobId"))
.put("processId", input.getString("processId"))
.put("type", "PROCESS")
.put("status", Status.ACCEPTED)
.put("location",
config.getString("hostName")
.concat("/jobs/")
.concat(jobStarted.getString("jobId")))
));
finalProcessService.execute(input); // Start async process
} else {
// Handle sync process
LOGGER.info("Sync Job started in DB with jobId {} for process with processId {}",
jobStarted.getValue("jobId"), input.getString("processId"));
finalProcessService.execute(input).onSuccess(result -> {
JsonObject response = result.copy();
response.put("sync", "true");
response.put("status", Status.SUCCESSFUL);
response.put("location",
config.getString("hostName")
.concat("/jobs/")
.concat(jobStarted.getString("jobId")));
handler.handle(Future.succeededFuture(response));
}).onFailure(failureHandler -> handler.handle(Future.failedFuture(failureHandler)));
}
}).onFailure(jobFailed -> handler.handle(Future.failedFuture(jobFailed.getMessage())));

} else {
LOGGER.error("Failed to validate the input");
handler.handle(Future.failedFuture(processException500));
ProcessService processService;

// Switch case to handle different processes
switch (processName) {
case "CollectionOnboarding":
processService = new CollectionOnboardingProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
case "CollectionAppending":
processService = new CollectionAppendingProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
case "S3PreSignedURLGeneration":
processService = new S3PreSignedURLGenerationProcess(pgPool, webClient, config, processSpecificS3Conf);
break;
case "TilesMetaDataOnboarding":
processService = new TilesMetaDataOnboardingProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
case "TilesOnboardingFromExistingFeature":
processService = new TilesOnboardingFromExistingFeatureProcess(pgPool, webClient, config, processSpecificS3Conf, vertx);
break;
default:
LOGGER.error("No method specified for process {}", processName);
handler.handle(Future.failedFuture("Process could not be executed: no method specified for process " + processName));
return;
}

ProcessService finalProcessService = processService;
Future<JsonObject> startAJobInDB = utilClass.startAJobInDB(input);

startAJobInDB.onSuccess(jobStarted -> {
if (isAsync) {
// Handle async process
LOGGER.info("Async Job started in DB with jobId {} for process with processId {}",
jobStarted.getValue("jobId"), input.getString("processId"));
handler.handle(Future.succeededFuture(
new JsonObject()
.put("jobId", jobStarted.getValue("jobId"))
.put("processId", input.getString("processId"))
.put("type", "PROCESS")
.put("status", Status.ACCEPTED)
.put("location",
config.getString("hostName")
.concat("/jobs/")
.concat(jobStarted.getString("jobId")))
));
finalProcessService.execute(input); // Start async process
} else {
// Handle sync process
LOGGER.info("Sync Job started in DB with jobId {} for process with processId {}",
jobStarted.getValue("jobId"), input.getString("processId"));
finalProcessService.execute(input).onSuccess(result -> {
JsonObject response = result.copy();
response.put("sync", "true");
response.put("status", Status.SUCCESSFUL);
response.put("location",
config.getString("hostName")
.concat("/jobs/")
.concat(jobStarted.getString("jobId")));
handler.handle(Future.succeededFuture(response));
}).onFailure(failureHandler -> handler.handle(Future.failedFuture(failureHandler)));
}
}).onFailure(jobFailed -> handler.handle(Future.failedFuture(jobFailed.getMessage())));

}).onFailure(processNotExist -> handler.handle(Future.failedFuture(processNotExist)));
return this;
}
Expand Down Expand Up @@ -226,13 +237,15 @@ private Future<JsonObject> processExistCheck(JsonObject input) {
*
* @param requestBody the input JSON containing request data
* @param processInputs the JSON containing the required input fields for the process
* @return {@code true} if the input is valid, {@code false} otherwise
* @return empty list if all inputs are present and non-empty, else list with input names that are missing or empty
*/
private boolean validateInput(JsonObject requestBody, JsonObject processInputs) {
private List<String> validateInput(JsonObject requestBody, JsonObject processInputs) {
Set<String> processInputKeys = processInputs.getJsonObject("input", new JsonObject())
.getJsonObject("inputs", new JsonObject()).getMap().keySet();
return processInputKeys.stream().allMatch(
inputKey -> requestBody.containsKey(inputKey) && !requestBody.getString(inputKey).isEmpty());

return processInputKeys.stream().filter(
inputKey -> !requestBody.containsKey(inputKey) || requestBody.getString(inputKey).isEmpty())
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE processes_table SET input = jsonb_set(input, '{inputs}', input -> 'inputs' || '{"s3BucketIdentifier" : "Identifier of the S3 bucket that the process will operate upon"}') where title = ANY('{CollectionOnboarding,CollectionAppending,TilesMetaDataOnboarding,S3PreSignedURLGeneration,TilesOnboardingFromExistingFeature}');
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.restassured.response.Response;
import io.vertx.core.json.JsonObject;
import jdk.jfr.Description;
import ogc.rs.common.S3ConfigsHolder;
import ogc.rs.util.FakeTokenBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -73,6 +74,7 @@ private JsonObject requestBody() {
inputs.put("description", "Valid file for appending test.");
inputs.put("resourceId", "61f2187e-affe-4f28-be0e-fe1cd37dbd4e");
inputs.put("version", "1.0.0");
inputs.put(S3ConfigsHolder.S3_CONFIGS_BLOCK_KEY_NAME, "default");

requestBody.put("inputs", inputs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.restassured.response.Response;
import io.vertx.core.json.JsonObject;
import jdk.jfr.Description;
import ogc.rs.common.S3ConfigsHolder;
import ogc.rs.util.FakeTokenBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -79,6 +80,7 @@ private JsonObject requestBody() {
inputs.put("description", "Invalid file for testing.");
inputs.put("resourceId", "2cfc08b8-a43d-40d4-ba98-c6fdfa76a0c1");
inputs.put("version", "1.0.0");
inputs.put(S3ConfigsHolder.S3_CONFIGS_BLOCK_KEY_NAME, "default");

requestBody.put("inputs", inputs);
requestBody.put("response", "raw");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private JsonObject requestBody() {
inputs.put("pointOfOrigin", new Double[]{-20037508.3427892, 20037508.3427892});
inputs.put("bbox", new Double[]{68.17751186879357,6.752782631992987,97.41289651394189,37.08834177335088});
inputs.put("temporal", new String[]{"2023-01-01T00:00:00Z","2023-12-31T23:59:59Z"});
inputs.put(S3ConfigsHolder.S3_CONFIGS_BLOCK_KEY_NAME, "default");

requestBody.put("inputs", inputs);

Expand Down

0 comments on commit bc03412

Please sign in to comment.