Skip to content

Commit

Permalink
Merge pull request #390 from OHDSI/status-endpoint
Browse files Browse the repository at this point in the history
Add /api/v1/status endpoint
  • Loading branch information
dmitrys-odysseus authored Jul 25, 2024
2 parents 1cf0dd4 + deca8a6 commit 136e443
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.odysseusinc.arachne.execution_engine_common.api.v1.dto;

import java.time.Instant;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class EngineStatus {
/**
* The moment when EE has started.
*/
private Instant started;
/**
* The list of
*/
private Map<Long, ExecutionOutcome> submissions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestStatusDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus;
import com.odysseusinc.arachne.executionengine.execution.AnalysisService;
import com.odysseusinc.arachne.executionengine.execution.CallbackService;
import com.odysseusinc.arachne.executionengine.execution.Overseer;
import com.odysseusinc.arachne.executionengine.util.AnalisysUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -60,11 +60,13 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
Expand Down Expand Up @@ -102,6 +104,12 @@ public AnalysisController(AnalysisService analysisService, CallbackService callb
this.threadPoolExecutor = threadPoolExecutor;
}

@ApiOperation(value = "Status for analysis")
@GetMapping(value = "/status", produces = MediaType.APPLICATION_JSON_VALUE)
public EngineStatus status(@RequestParam(value = "id", required = false) List<Long> ids) {
return analysisService.getStatus(ids);
}

@ApiOperation(value = "Files for analysis")
@RequestMapping(value = REST_API_ANALYZE,
method = RequestMethod.POST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultStatusDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage;
import com.odysseusinc.arachne.executionengine.aspect.FileDescriptorCount;
import com.odysseusinc.arachne.executionengine.service.CdmMetadataService;
import com.odysseusinc.arachne.executionengine.util.AutoCloseWrapper;
import java.io.File;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand All @@ -57,6 +60,7 @@
import net.lingala.zip4j.exception.ZipException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -152,6 +156,22 @@ private void attachMetadata(AnalysisSyncRequestDTO analysis, File analysisDir) {
}
}

public EngineStatus getStatus(List<Long> idsMaybe) {
Map<Long, ExecutionOutcome> statuses = Optional.ofNullable(idsMaybe).map(ids ->
ids.stream().flatMap(id ->
Optional.ofNullable(overseers.get(id)).map(overseer ->
Stream.of(Pair.of(id, getStatus(overseer)))
).orElseGet(Stream::of)
).collect(Collectors.toMap(Pair::getKey, Pair::getValue))
).orElseGet(Collections::emptyMap);
return new EngineStatus(Instant.now(), statuses);
}

private static ExecutionOutcome getStatus(Overseer overseer) {
return Optional.ofNullable(overseer.getResult().getNow(null)).orElseGet(() ->
new ExecutionOutcome(Stage.EXECUTE, null, overseer.getStdout())
);
}

@FileDescriptorCount
public AnalysisRequestStatusDTO analyze(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.odysseusinc.arachne.executionengine.execution;

import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestTypeDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import lombok.Getter;

/**
* Dummy implementation to provide consistent status and callback reporting for analysis that failed to initialize.
*/
@Getter
public class FailedOverseer implements Overseer {
private final Instant started;
private final String message;
private final AnalysisRequestTypeDTO type;
private final CompletableFuture<ExecutionOutcome> result;
private final ExecutionOutcome outcome;
private final Throwable error;

public FailedOverseer(Instant started, String message, AnalysisRequestTypeDTO type, Throwable error) {
this.started = started;
this.message = message;
this.type = type;
this.error = error;
outcome = new ExecutionOutcome(Stage.INITIALIZE, message, message);
result = CompletableFuture.completedFuture(outcome);
}

@Override
public String getStdout() {
return getMessage();
}

@Override
public CompletableFuture<ExecutionOutcome> abort() {
return result;
}

@Override
public String getEnvironment() {
return null;
}

@Override
public Overseer whenComplete(BiConsumer<ExecutionOutcome, Throwable> finalizer) {
CompletableFuture.runAsync(() -> finalizer.accept(outcome, error));
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package com.odysseusinc.arachne.executionengine.execution.r;

import com.odysseusinc.arachne.commons.types.DBMSType;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestTypeDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO;
import com.odysseusinc.arachne.execution_engine_common.util.BigQueryUtils;
import com.odysseusinc.arachne.executionengine.config.properties.HiveBulkLoadProperties;
import com.odysseusinc.arachne.executionengine.execution.DriverLocations;
import com.odysseusinc.arachne.executionengine.execution.ExecutionService;
import com.odysseusinc.arachne.executionengine.execution.FailedOverseer;
import com.odysseusinc.arachne.executionengine.execution.KerberosSupport;
import com.odysseusinc.arachne.executionengine.execution.Overseer;
import com.odysseusinc.arachne.executionengine.service.ConnectionPoolService;
import com.odysseusinc.datasourcemanager.krblogin.KrbConfig;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -25,6 +33,8 @@
public abstract class RService implements ExecutionService {
protected static final String EXECUTION_COMMAND = "Rscript";

private static final String HOST_DOCKER_INTERNAL = "host.docker.internal";

private static final String RUNTIME_ENV_DATA_SOURCE_NAME = "DATA_SOURCE_NAME";
private static final String RUNTIME_ENV_DBMS_USERNAME = "DBMS_USERNAME";
private static final String RUNTIME_ENV_DBMS_PASSWORD = "DBMS_PASSWORD";
Expand Down Expand Up @@ -58,6 +68,8 @@ public abstract class RService implements ExecutionService {
@Autowired
private HiveBulkLoadProperties hiveBulkLoadProperties;
@Autowired
private ConnectionPoolService poolService;
@Autowired
private DriverLocations drivers;

private static String sanitizeFilename(String filename) {
Expand All @@ -69,6 +81,27 @@ public String getExtension() {
}

public Overseer analyze(AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer<String, String> callback, Integer updateInterval) {
Long id = analysis.getId();
DataSourceUnsecuredDTO dataSource = analysis.getDataSource();
String jdbcUrl = dataSource.getConnectionString();
if (jdbcUrl.contains(HOST_DOCKER_INTERNAL)) {
try {
String address = InetAddress.getByName(HOST_DOCKER_INTERNAL).getHostAddress();
String newUrl = jdbcUrl.replace(HOST_DOCKER_INTERNAL, address);
log.info("Resolved {} = [{}]", HOST_DOCKER_INTERNAL, address);
dataSource.setConnectionString(newUrl);
} catch (UnknownHostException e) {
return new FailedOverseer(Instant.now(), "Unable to resolve to [" + HOST_DOCKER_INTERNAL + "]", AnalysisRequestTypeDTO.R, e);
}
}
log.info("Execution [{}] checking connection to [{}]", id, dataSource.getConnectionString());
try (Connection conn = poolService.getDataSource(dataSource).getConnection()) {
String name = conn.getMetaData().getDatabaseProductName();
log.info("Execution [{}] connection verified, engine: [{}]", id, name);
} catch (SQLException e) {
log.info("Execution [{}] connection verification failed [{}]", id, e.getMessage());
}

File keystoreDir = new File(analysisDir, "keys");
KrbConfig krbConfig = kerberosSupport.getConfig(analysis, keystoreDir);

Expand Down

0 comments on commit 136e443

Please sign in to comment.