diff --git a/commons/src/main/java/com/odysseusinc/arachne/execution_engine_common/api/v1/dto/EngineStatus.java b/commons/src/main/java/com/odysseusinc/arachne/execution_engine_common/api/v1/dto/EngineStatus.java new file mode 100644 index 00000000..e8bf01d5 --- /dev/null +++ b/commons/src/main/java/com/odysseusinc/arachne/execution_engine_common/api/v1/dto/EngineStatus.java @@ -0,0 +1,23 @@ +package com.odysseusinc.arachne.execution_engine_common.api.v1.dto; + +import java.time.Instant; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class EngineStatus { + /** + * The moment when EE has started. + */ + private Instant started; + /** + * The list of + */ + private Map submissions; +} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/api/v1/AnalysisController.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/api/v1/AnalysisController.java index f4e948cd..f1aceea4 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/api/v1/AnalysisController.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/api/v1/AnalysisController.java @@ -28,9 +28,9 @@ import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestStatusDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus; import com.odysseusinc.arachne.executionengine.execution.AnalysisService; import com.odysseusinc.arachne.executionengine.execution.CallbackService; -import com.odysseusinc.arachne.executionengine.execution.Overseer; import com.odysseusinc.arachne.executionengine.util.AnalisysUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -60,11 +60,13 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestPart; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; @@ -102,6 +104,12 @@ public AnalysisController(AnalysisService analysisService, CallbackService callb this.threadPoolExecutor = threadPoolExecutor; } + @ApiOperation(value = "Status for analysis") + @GetMapping(value = "/status", produces = MediaType.APPLICATION_JSON_VALUE) + public EngineStatus status(@RequestParam(value = "id", required = false) List ids) { + return analysisService.getStatus(ids); + } + @ApiOperation(value = "Files for analysis") @RequestMapping(value = REST_API_ANALYZE, method = RequestMethod.POST, diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/AnalysisService.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/AnalysisService.java index 7ecf6503..cc75aa50 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/AnalysisService.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/AnalysisService.java @@ -33,12 +33,15 @@ import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultStatusDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage; import com.odysseusinc.arachne.executionengine.aspect.FileDescriptorCount; import com.odysseusinc.arachne.executionengine.service.CdmMetadataService; import com.odysseusinc.arachne.executionengine.util.AutoCloseWrapper; import java.io.File; +import java.time.Instant; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; @@ -57,6 +60,7 @@ import net.lingala.zip4j.exception.ZipException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -152,6 +156,22 @@ private void attachMetadata(AnalysisSyncRequestDTO analysis, File analysisDir) { } } + public EngineStatus getStatus(List idsMaybe) { + Map statuses = Optional.ofNullable(idsMaybe).map(ids -> + ids.stream().flatMap(id -> + Optional.ofNullable(overseers.get(id)).map(overseer -> + Stream.of(Pair.of(id, getStatus(overseer))) + ).orElseGet(Stream::of) + ).collect(Collectors.toMap(Pair::getKey, Pair::getValue)) + ).orElseGet(Collections::emptyMap); + return new EngineStatus(Instant.now(), statuses); + } + + private static ExecutionOutcome getStatus(Overseer overseer) { + return Optional.ofNullable(overseer.getResult().getNow(null)).orElseGet(() -> + new ExecutionOutcome(Stage.EXECUTE, null, overseer.getStdout()) + ); + } @FileDescriptorCount public AnalysisRequestStatusDTO analyze( diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/FailedOverseer.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/FailedOverseer.java new file mode 100644 index 00000000..7b14e835 --- /dev/null +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/FailedOverseer.java @@ -0,0 +1,52 @@ +package com.odysseusinc.arachne.executionengine.execution; + +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestTypeDTO; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import lombok.Getter; + +/** + * Dummy implementation to provide consistent status and callback reporting for analysis that failed to initialize. + */ +@Getter +public class FailedOverseer implements Overseer { + private final Instant started; + private final String message; + private final AnalysisRequestTypeDTO type; + private final CompletableFuture result; + private final ExecutionOutcome outcome; + private final Throwable error; + + public FailedOverseer(Instant started, String message, AnalysisRequestTypeDTO type, Throwable error) { + this.started = started; + this.message = message; + this.type = type; + this.error = error; + outcome = new ExecutionOutcome(Stage.INITIALIZE, message, message); + result = CompletableFuture.completedFuture(outcome); + } + + @Override + public String getStdout() { + return getMessage(); + } + + @Override + public CompletableFuture abort() { + return result; + } + + @Override + public String getEnvironment() { + return null; + } + + @Override + public Overseer whenComplete(BiConsumer finalizer) { + CompletableFuture.runAsync(() -> finalizer.accept(outcome, error)); + return this; + } +} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java index 6f0d3595..d0ddeda3 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java @@ -1,16 +1,24 @@ package com.odysseusinc.arachne.executionengine.execution.r; import com.odysseusinc.arachne.commons.types.DBMSType; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestTypeDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; import com.odysseusinc.arachne.execution_engine_common.util.BigQueryUtils; import com.odysseusinc.arachne.executionengine.config.properties.HiveBulkLoadProperties; import com.odysseusinc.arachne.executionengine.execution.DriverLocations; import com.odysseusinc.arachne.executionengine.execution.ExecutionService; +import com.odysseusinc.arachne.executionengine.execution.FailedOverseer; import com.odysseusinc.arachne.executionengine.execution.KerberosSupport; import com.odysseusinc.arachne.executionengine.execution.Overseer; +import com.odysseusinc.arachne.executionengine.service.ConnectionPoolService; import com.odysseusinc.datasourcemanager.krblogin.KrbConfig; import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -25,6 +33,8 @@ public abstract class RService implements ExecutionService { protected static final String EXECUTION_COMMAND = "Rscript"; + private static final String HOST_DOCKER_INTERNAL = "host.docker.internal"; + private static final String RUNTIME_ENV_DATA_SOURCE_NAME = "DATA_SOURCE_NAME"; private static final String RUNTIME_ENV_DBMS_USERNAME = "DBMS_USERNAME"; private static final String RUNTIME_ENV_DBMS_PASSWORD = "DBMS_PASSWORD"; @@ -58,6 +68,8 @@ public abstract class RService implements ExecutionService { @Autowired private HiveBulkLoadProperties hiveBulkLoadProperties; @Autowired + private ConnectionPoolService poolService; + @Autowired private DriverLocations drivers; private static String sanitizeFilename(String filename) { @@ -69,6 +81,27 @@ public String getExtension() { } public Overseer analyze(AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer callback, Integer updateInterval) { + Long id = analysis.getId(); + DataSourceUnsecuredDTO dataSource = analysis.getDataSource(); + String jdbcUrl = dataSource.getConnectionString(); + if (jdbcUrl.contains(HOST_DOCKER_INTERNAL)) { + try { + String address = InetAddress.getByName(HOST_DOCKER_INTERNAL).getHostAddress(); + String newUrl = jdbcUrl.replace(HOST_DOCKER_INTERNAL, address); + log.info("Resolved {} = [{}]", HOST_DOCKER_INTERNAL, address); + dataSource.setConnectionString(newUrl); + } catch (UnknownHostException e) { + return new FailedOverseer(Instant.now(), "Unable to resolve to [" + HOST_DOCKER_INTERNAL + "]", AnalysisRequestTypeDTO.R, e); + } + } + log.info("Execution [{}] checking connection to [{}]", id, dataSource.getConnectionString()); + try (Connection conn = poolService.getDataSource(dataSource).getConnection()) { + String name = conn.getMetaData().getDatabaseProductName(); + log.info("Execution [{}] connection verified, engine: [{}]", id, name); + } catch (SQLException e) { + log.info("Execution [{}] connection verification failed [{}]", id, e.getMessage()); + } + File keystoreDir = new File(analysisDir, "keys"); KrbConfig krbConfig = kerberosSupport.getConfig(analysis, keystoreDir);