Skip to content

Commit

Permalink
Merge pull request #370 from OHDSI/docker-support
Browse files Browse the repository at this point in the history
Add Docker support
  • Loading branch information
dmitrys-odysseus authored Jan 9, 2024
2 parents c89282a + 7da6750 commit 5fa782a
Show file tree
Hide file tree
Showing 25 changed files with 560 additions and 240 deletions.
Original file line number Diff line number Diff line change
@@ -1,41 +1,21 @@
package com.odysseusinc.arachne.execution_engine_common.descriptor.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class RuntimeEnvironmentDescriptorsDTO {

@JsonProperty
private boolean docker;
@JsonProperty
private List<RuntimeEnvironmentDescriptorDTO> descriptors;

public RuntimeEnvironmentDescriptorsDTO() {
}

public RuntimeEnvironmentDescriptorsDTO(List<RuntimeEnvironmentDescriptorDTO> descriptors) {
this.descriptors = descriptors;
}

public RuntimeEnvironmentDescriptorsDTO(boolean docker, List<RuntimeEnvironmentDescriptorDTO> descriptors) {
this.docker = docker;
this.descriptors = descriptors;
}

public List<RuntimeEnvironmentDescriptorDTO> getDescriptors() {
return descriptors;
}

public void setDescriptors(List<RuntimeEnvironmentDescriptorDTO> descriptors) {
this.descriptors = descriptors;
}

public boolean isDocker() {
return docker;
}

public void setDocker(boolean docker) {
this.docker = docker;
}
}
59 changes: 58 additions & 1 deletion engine/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
# ArachneExecutionEngine
Arachne Execution Engine is a component used to execute remote SQL or R code. It is used by both Arachne Data Node as well as WebAPI

## Configuration

### Sample options for creating a container for running locally

Generic options:

--rm // Remove on exit
-p 8888:8888 // Bind host port to container port
--add-host=host.docker.internal:host-gateway // Allow access to DB running on host bare

For using tarball execution environments:

-e RUNTIMESERVICE_DIST_ARCHIVE=/dist/r_base_focal_amd64.tar.gz // Name of the default execution environment
-v /etc/environments:/runtimes // Mount host directory volume

For using Docker execution environments:

--privileged // Allow spawning other containers
-v /var/run/docker.sock:/var/run/docker.sock // Mount socket to connect to host Docker from inside container
-v /etc/ee:/etc/executions // Mount host directory /etc/ee to volume /etc/executions in container to hold executions
-e DOCKER_ENABLE=true // Enable execution in Docker container
-e DOCKER_IMAGE_DEFAULT=odysseusinc/r-hades:2023q3v3 // Default image to use for running executions
-e ANALYSIS_MOUNT=/etc/ee // Provide container location of the host directory for executions to allow mounting it spawn Docker containers
-e DOCKER_REGISTRY_URL=... // (Optional) url to Docker registry for pulling image runtime files
-e **DOCKER**_REGISTRY_USERNAME=... // (Optional) username to connect to Docker registry
-e DOCKER_REGISTRY_PASSWORD=... // (Optional) password to connect to Docker registry

## Build with Impala JDBC driver

1. Download Cloudera JDBC Connector using the following link:
Expand Down Expand Up @@ -37,4 +64,34 @@ of the following dbms types with `cdm.dbms` parameter:
- oracle
- implala
- bigquery
- netezza
- netezza

## Process R files with Docker (locally)

- You can use the following template for testing purposes:

```bash
curl --location 'https://localhost:8888/api/v1/analyze' \
--header 'arachne-compressed: false' \
--header 'arachne-waiting-compressed-result: false' \
--header 'arachne-attach-cdm-metadata: true' \
--header 'arachne-result-chunk-size-mb: 10485760' \
--form 'analysisRequest="{
\"id\": 123,
\"executableFileName\": \"main.R\",
\"dataSource\": {
\"id\": 123,
\"name\": \"Data Source\",
\"url\": \"https://test.com"
},
\"requested\": \"2023-12-19T10:00:00Z\",
\"requestedDescriptorId\": \"789\",
\"resultExclusions\": \"exclude_result1,exclude_result2\",
\"dockerImage\": \"r-base\",
\"callbackPassword\": \"callback-password\",
\"updateStatusCallback\": \"https://callback-url.com/update\",
\"resultCallback\": \"https://callback-url.com/result\"
}";type=application/json' \
--form 'file=@"/Downloads/main.R"' \
--form 'container="r-base"'
```
14 changes: 14 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<mysql.version>8.0.28</mysql.version>
<postgresql.version>42.3.7</postgresql.version>
<snakeyaml.version>1.33</snakeyaml.version>
<docker-java.version>3.3.4</docker-java.version>

</properties>

Expand Down Expand Up @@ -181,6 +182,19 @@
<artifactId>lombok</artifactId>
</dependency>

<!-- Docker -->
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>${docker-java.version}</version>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-httpclient5</artifactId>
<version>${docker-java.version}</version>
</dependency>

</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpEntity;
Expand Down Expand Up @@ -89,6 +90,9 @@ public class AnalysisController {
private final CallbackService callbackService;
private final ThreadPoolTaskExecutor threadPoolExecutor;

@Value("${analysis.dir:/etc/executions}")
private String analysisParentDir;

@Autowired
public AnalysisController(AnalysisService analysisService, CallbackService callbackService,
ThreadPoolTaskExecutor threadPoolExecutor) {
Expand All @@ -104,6 +108,7 @@ public AnalysisController(AnalysisService analysisService, CallbackService callb
consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE
)

public AnalysisRequestStatusDTO analyze(
@RequestPart("analysisRequest") @Valid AnalysisRequestDTO analysisRequest,
@RequestPart("file") List<MultipartFile> files,
Expand All @@ -116,10 +121,9 @@ public AnalysisRequestStatusDTO analyze(
Long id = analysisRequest.getId();
try {
log.info("Request [{}] for [{}] received", id, analysisRequest.getResultCallback());
final File analysisDir = AnalisysUtils.extractFiles(files, compressed);
log.info("Request [{}] extracted {} files", id,
Optional.ofNullable(analysisDir.list()).map(dir -> dir.length).orElse(-1)
);
File analysisDir = AnalisysUtils.extractFiles(files, analysisParentDir, compressed);
Integer extracted = Optional.ofNullable(analysisDir.list()).map(dir -> dir.length).orElse(-1);
log.info("Request [{}] extracted {} files to [{}]", id, extracted, analysisDir.getAbsolutePath());
AnalysisRequestStatusDTO result = analysisService.analyze(analysisRequest, analysisDir, waitCompressedResult, attachCdmMetadata, chunkSize);
log.info("Request [{}] of type [{}] accepted into processing", id, result.getType());
return result;
Expand All @@ -145,7 +149,7 @@ public ResponseEntity<MultiValueMap<String, Object>> analyzeSync(

Long id = analysisRequest.getId();
log.info("Started processing request for synchronous analysis ID = {}", id);
final File analysisDir = AnalisysUtils.extractFiles(files, false);
final File analysisDir = AnalisysUtils.extractFiles(files, analysisParentDir, false);
log.info("Extracted files for synchronous analysis ID = {}", id);

StringBuilder stdoutBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
Expand All @@ -32,6 +33,9 @@ public class DescriptorController {

private static final DescriptorConverter descriptorConverter = new DescriptorConverter();

@Value("${docker.enable:false}")
private boolean useDocker;

public DescriptorController(DescriptorService descriptorService) {
this.descriptorService = descriptorService;
}
Expand All @@ -40,7 +44,7 @@ public DescriptorController(DescriptorService descriptorService) {
@RequestMapping(value = REST_API_DESCRIPTORS, method = RequestMethod.GET)
public RuntimeEnvironmentDescriptorsDTO getDescriptors() {
Stream<Descriptor> descriptors = descriptorService.getDescriptors().map(Collection::stream).orElseGet(Stream::of);
return new RuntimeEnvironmentDescriptorsDTO(descriptors.map(descriptorConverter::toDto).collect(Collectors.toList()));
return new RuntimeEnvironmentDescriptorsDTO(useDocker, descriptors.map(descriptorConverter::toDto).collect(Collectors.toList()));
}

@ApiOperation(value = "Runtimes with identifier for analysis")
Expand All @@ -50,6 +54,6 @@ public RuntimeEnvironmentDescriptorsDTO getDescriptors(@PathVariable String id)
List<RuntimeEnvironmentDescriptorDTO> descriptorDTOS = descriptors.stream()
.map(descriptorConverter::toDto)
.collect(Collectors.toList());
return new RuntimeEnvironmentDescriptorsDTO(descriptorDTOS);
return new RuntimeEnvironmentDescriptorsDTO(useDocker, descriptorDTOS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class FileDescriptorCountAspect {

private static final Logger LOGGER = LoggerFactory.getLogger(FileDescriptorCountAspect.class);

@Value("${logging.descriptor.count.enabled}")
@Value("${logging.descriptor.count.enabled:false}")
private boolean enabled;

@Around("@annotation(com.odysseusinc.arachne.executionengine.aspect.FileDescriptorCount)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

package com.odysseusinc.arachne.executionengine.config;

import com.odysseusinc.arachne.executionengine.config.properties.DockerRegistryProperties;
import com.odysseusinc.arachne.executionengine.execution.ExecutionService;
import com.odysseusinc.arachne.executionengine.execution.r.DockerService;
import com.odysseusinc.arachne.executionengine.execution.r.TarballRService;
import com.odysseusinc.datasourcemanager.krblogin.KerberosService;
import com.odysseusinc.datasourcemanager.krblogin.KerberosServiceImpl;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -46,6 +50,8 @@ public class AnalisysConfig {
private String kinitPath;
@Value("${kerberos.configPath}")
private String configPath;
@Value("${docker.enable:false}")
private boolean useDocker;

@Bean(name = "analysisTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
Expand All @@ -68,4 +74,9 @@ public KerberosService kerberosService() {

return new KerberosServiceImpl(timeout, kinitPath, configPath);
}

@Bean
ExecutionService runtimeService(DockerRegistryProperties properties) {
return useDocker ? new DockerService(properties) : new TarballRService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.odysseusinc.arachne.executionengine.config.properties;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "docker.registry")
public class DockerRegistryProperties {
private String url;
private String username;
private String pasword;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class RIsolatedRuntimeProperties {
// Path to default runtime environment
private String archive;
// Path to folder with custom runtime environments
private String archiveFolder;
private String archiveFolder = "/runtimes";
// Flag for showing difference between dependencies
private boolean applyRuntimeDependenciesComparisonLogic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import lombok.Getter;

public abstract class AbstractOverseer implements Overseer {
/**
* Execution id for logging.
*/
protected final long id;
protected final StringBuffer stdout = new StringBuffer();
protected final StringBuffer stdout;
protected final BiConsumer<String, String> callback;
/**
* A pure execution result
Expand All @@ -18,20 +21,24 @@ public abstract class AbstractOverseer implements Overseer {
protected final Instant started;
@Getter
protected final String environment;
protected final int killTimeout;
/**
* Timeout to wait for kill command to complete, in seconds
*/
protected final int killTimeoutSec;
/**
* Execution result after applying post-processing
*/
@Getter
protected volatile CompletableFuture<ExecutionOutcome> result;

public AbstractOverseer(long id, BiConsumer<String, String> callback, Instant started, String environment, int killTimeout, CompletableFuture<ExecutionOutcome> outcome) {
public AbstractOverseer(long id, BiConsumer<String, String> callback, Instant started, String environment, int killTimeoutSec, StringBuffer stdout, CompletableFuture<ExecutionOutcome> outcome) {
this.id = id;
this.callback = callback;
this.started = started;
this.environment = environment;
this.killTimeout = killTimeout;
this.killTimeoutSec = killTimeoutSec;
result = this.outcome = outcome;
this.stdout = stdout;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import com.odysseusinc.arachne.executionengine.aspect.FileDescriptorCount;
import com.odysseusinc.arachne.executionengine.service.CdmMetadataService;
import com.odysseusinc.arachne.executionengine.util.AutoCloseWrapper;
import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -49,6 +51,8 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import net.lingala.zip4j.exception.ZipException;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -60,11 +64,6 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.io.File;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
@Service
public class AnalysisService {
Expand Down Expand Up @@ -137,10 +136,10 @@ private ExecutionOutcome abort(Long id, Overseer overseer) {
return new ExecutionOutcome(Stage.ABORT, "Waiting for abort interrupted", overseer.getStdout());
} catch (ExecutionException e) {
log.info("Execution [{}] abort attempt failed", id, e);
return new ExecutionOutcome(Stage.ABORT, e.getMessage(), overseer.getStdout() + "\n" + getStackTrace(e.getCause()));
return new ExecutionOutcome(Stage.ABORT, e.getMessage(), overseer.getStdout() + "\r\n" + getStackTrace(e.getCause()));
} catch (TimeoutException e) {
log.info("Execution [{}] waiting for abort timed out", id);
return new ExecutionOutcome(Stage.ABORT, e.getMessage(), overseer.getStdout() + "\n" + getStackTrace(e.getCause()));
return new ExecutionOutcome(Stage.ABORT, e.getMessage(), overseer.getStdout() + "\r\n" + getStackTrace(e.getCause()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void sendResults(AnalysisResultDTO result, Collection<FileSystemResource>
ctx -> {
Throwable t = ctx.getLastThrowable();
if (t == null) {
log.warn("Execution [{}], send result: {} - {}", id, result.getStage(), result.getError());
log.warn("Execution [{}] send result: {} - {}", id, result.getStage(), result.getError());
} else {
log.info("Execution [{}], retry send result after error: {}", id, t.getMessage());
log.info("Execution [{}] retry send result after error: {}", id, t.getMessage());
}
ResponseEntity<String> sent = executeSend(result, files, url, password);
log.info("Execution [{}] result status sent, response HTTP {}", id, sent.getStatusCode());
Expand Down
Loading

0 comments on commit 5fa782a

Please sign in to comment.