diff --git a/engine/pom.xml b/engine/pom.xml index 4fa5247..8d24b20 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -359,6 +359,7 @@ 2.13.0 2.12.0 0.31.0 + 1.45.0 /bigquery @@ -428,6 +429,11 @@ opencensus-contrib-http-util ${opencensus.contrib.http.util.version} + + io.grpc + grpc-core + ${io.grpc.grpc-core.version} + diff --git a/engine/src/main/docker/Dockerfile b/engine/src/main/docker/Dockerfile index 994bdbe..1142c18 100644 --- a/engine/src/main/docker/Dockerfile +++ b/engine/src/main/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM eclipse-temurin:8-jdk +FROM eclipse-temurin:8-jre LABEL maintainer="Odysseus Data Services, Inc: support@odysseusinc.com" VOLUME /tmp VOLUME /libs-r @@ -10,9 +10,6 @@ RUN ln -fs /usr/share/zoneinfo/Etc/GMT /etc/localtime && dpkg-reconfigure -f non RUN update-rc.d cron defaults && systemctl enable rsyslog ENV R_INSTALL_SCRIPT=libs.r ADD execution-engine-exec.jar /execution-engine.jar -RUN mkdir /impala -ADD Cloudera_ImpalaJDBC4_2.5.41.zip /impala/impala-jdbc.zip -RUN cd /impala && unzip impala-jdbc.zip && rm -f impala-jdbc.zip RUN mkdir /runtimes COPY descriptor_base.json /runtimes/descriptor_base.json RUN ls -la /runtimes/* diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/AuthEffects.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/AuthEffects.java new file mode 100644 index 0000000..c36f311 --- /dev/null +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/AuthEffects.java @@ -0,0 +1,17 @@ +package com.odysseusinc.arachne.executionengine.auth; + +import java.util.Map; + +public interface AuthEffects { + interface AddEnvironmentVariables extends AuthEffects { + Map getEnvVars(); + } + + interface ModifyUrl extends AuthEffects { + String getNewUrl(); + } + + interface Cleanup extends AuthEffects { + void cleanup(); + } +} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/BigQueryCredentialsProvider.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/BigQueryCredentialsProvider.java new file mode 100644 index 0000000..be4b8b0 --- /dev/null +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/BigQueryCredentialsProvider.java @@ -0,0 +1,64 @@ +package com.odysseusinc.arachne.executionengine.auth; + +import com.google.common.collect.ImmutableMap; +import com.odysseusinc.arachne.commons.types.DBMSType; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; + +@Service +@Slf4j +public class BigQueryCredentialsProvider implements CredentialsProvider { + private static final String RUNTIME_BQ_KEYFILE = "BQ_KEYFILE"; + + @Override + public AuthEffects apply(DataSourceUnsecuredDTO dataSource, Path analysisDir, String analysisDirInContainer) { + if (dataSource.getType() == DBMSType.BIGQUERY) { + try { + Path keyFile = Files.createTempFile(analysisDir, "", ".json").toAbsolutePath(); + try (OutputStream out = Files.newOutputStream(keyFile)) { + IOUtils.write(dataSource.getKeyfile(), out); + } + Path innerPath = Paths.get(analysisDirInContainer).resolve(keyFile.getFileName()); + return new Effects(dataSource.getConnectionString(), keyFile, innerPath.toString()); + } catch (IOException e) { + log.error("Failed to resolve BigQuery authentication for Source: [{}]", dataSource.getName(), e); + throw new RuntimeException(e); + } + } else { + return null; + } + } + + @AllArgsConstructor + private static class Effects implements AuthEffects, AuthEffects.AddEnvironmentVariables, AuthEffects.ModifyUrl, AuthEffects.Cleanup { + private final String newUrl; + private final Path keyFile; + private final String keyFileInContainer; + + @Override + public Map getEnvVars() { + return ImmutableMap.of(RUNTIME_BQ_KEYFILE, keyFileInContainer); + } + + @Override + public String getNewUrl() { + return newUrl + ";OAuthPvtKeyPath=" + keyFileInContainer + ";"; + } + + @Override + public void cleanup() { + FileUtils.deleteQuietly(keyFile.toFile()); + } + } +} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/CredentialsProvider.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/CredentialsProvider.java new file mode 100644 index 0000000..edb8f4e --- /dev/null +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/CredentialsProvider.java @@ -0,0 +1,9 @@ +package com.odysseusinc.arachne.executionengine.auth; + +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; + +import java.nio.file.Path; + +public interface CredentialsProvider { + AuthEffects apply(DataSourceUnsecuredDTO dataSource, Path analysisDir, String analysisDirInContainer); +} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/KerberosCredentialsProvider.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/KerberosCredentialsProvider.java new file mode 100644 index 0000000..daf759c --- /dev/null +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/auth/KerberosCredentialsProvider.java @@ -0,0 +1,60 @@ +package com.odysseusinc.arachne.executionengine.auth; + +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; +import com.odysseusinc.datasourcemanager.krblogin.KerberosService; +import com.odysseusinc.datasourcemanager.krblogin.KrbConfig; +import com.odysseusinc.datasourcemanager.krblogin.RuntimeServiceMode; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; + +@Service +@Slf4j +public class KerberosCredentialsProvider implements CredentialsProvider { + @Autowired + private KerberosService kerberosService; + + @Override + public AuthEffects apply(DataSourceUnsecuredDTO ds, Path analysisDir, String analysisDirInContainer) { + Path keys = analysisDir.resolve("keys"); + File keystoreDir = keys.toFile(); + + if (ds.getUseKerberos()) { + keystoreDir.mkdirs(); + try { + KrbConfig config = kerberosService.runKinit(ds, RuntimeServiceMode.SINGLE, keystoreDir); + return new Effects(config, keys); + } catch (IOException e) { + log.error("Failed to resolve Kerberos auth for Datasource: {}", ds.getName(), e); + throw new RuntimeException(e); + } + } else { + return null; + } + } + + @AllArgsConstructor + private static class Effects implements AuthEffects, AuthEffects.AddEnvironmentVariables, AuthEffects.Cleanup { + private final KrbConfig config; + private final Path keyFile; + + @Override + public Map getEnvVars() { + return config.getIsolatedRuntimeEnvs(); + } + + @Override + public void cleanup() { + FileUtils.deleteQuietly(keyFile.toFile()); + FileUtils.deleteQuietly(config.getComponents().getKeytabPath().toFile()); + FileUtils.deleteQuietly(config.getConfPath().toFile()); + } + } +} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/config/AnalisysConfig.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/config/AnalisysConfig.java index 622dc13..c23652a 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/config/AnalisysConfig.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/config/AnalisysConfig.java @@ -22,6 +22,7 @@ package com.odysseusinc.arachne.executionengine.config; +import com.odysseusinc.datasourcemanager.jdbc.auth.BigQueryAuthResolver; import com.odysseusinc.datasourcemanager.krblogin.KerberosService; import com.odysseusinc.datasourcemanager.krblogin.KerberosServiceImpl; import org.springframework.beans.factory.annotation.Value; @@ -69,4 +70,9 @@ public KerberosService kerberosService() { return new KerberosServiceImpl(timeout, kinitPath, configPath); } + @Bean + public BigQueryAuthResolver bigQueryAuthResolver() { + return new BigQueryAuthResolver(); + }; + } diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/KerberosSupport.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/KerberosSupport.java deleted file mode 100644 index 90c59b2..0000000 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/KerberosSupport.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.odysseusinc.arachne.executionengine.execution; - -import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; -import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; -import com.odysseusinc.datasourcemanager.krblogin.KerberosService; -import com.odysseusinc.datasourcemanager.krblogin.KrbConfig; -import com.odysseusinc.datasourcemanager.krblogin.RuntimeServiceMode; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.io.File; -import java.io.IOException; - -@Slf4j -@Service -public class KerberosSupport { - @Autowired - private KerberosService kerberosService; - - public KrbConfig getConfig(AnalysisSyncRequestDTO analysis, File keystoreDir) { - DataSourceUnsecuredDTO ds = analysis.getDataSource(); - if (ds.getUseKerberos()) { - keystoreDir.mkdirs(); - try { - return kerberosService.runKinit(ds, RuntimeServiceMode.SINGLE, keystoreDir); - } catch (IOException e) { - log.error("Analysis [{}] failed to resolve Kerberos auth for Datasource: {}", analysis.getId(), ds.getName(), e); - throw new RuntimeException(e); - } - } else { - return new KrbConfig(); - } - } -} diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/DockerService.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/DockerService.java index 165c636..0d135e7 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/DockerService.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/DockerService.java @@ -15,10 +15,10 @@ import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage; import com.odysseusinc.arachne.execution_engine_common.descriptor.dto.DockerEnvironmentDTO; +import com.odysseusinc.arachne.executionengine.auth.AuthEffects; import com.odysseusinc.arachne.executionengine.config.properties.DockerRegistryProperties; import com.odysseusinc.arachne.executionengine.execution.Overseer; import com.odysseusinc.arachne.executionengine.util.Streams; -import com.odysseusinc.datasourcemanager.krblogin.KrbConfig; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -95,9 +95,9 @@ public DockerService(DockerRegistryProperties properties) { @Override protected Overseer analyze( - AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer sendCallback, Integer updateInterval, KrbConfig krbConfig + AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer sendCallback, Integer updateInterval, AuthEffects auth ) { - List env = buildRuntimeEnvVariables(analysis.getDataSource(), krbConfig.getIsolatedRuntimeEnvs()).entrySet().stream().map(entry -> + List env = buildRuntimeEnvVariables(analysis.getDataSource(), auth).entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()).collect(Collectors.toList() ); Instant started = Instant.now(); diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java index a8927f7..9a8f429 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/RService.java @@ -1,19 +1,18 @@ package com.odysseusinc.arachne.executionengine.execution.r; -import com.odysseusinc.arachne.commons.types.DBMSType; +import com.google.common.util.concurrent.UncheckedExecutionException; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestTypeDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; -import com.odysseusinc.arachne.execution_engine_common.util.BigQueryUtils; +import com.odysseusinc.arachne.executionengine.auth.AuthEffects; +import com.odysseusinc.arachne.executionengine.auth.AuthEffects.AddEnvironmentVariables; +import com.odysseusinc.arachne.executionengine.auth.CredentialsProvider; import com.odysseusinc.arachne.executionengine.config.properties.HiveBulkLoadProperties; import com.odysseusinc.arachne.executionengine.execution.DriverLocations; import com.odysseusinc.arachne.executionengine.execution.FailedOverseer; -import com.odysseusinc.arachne.executionengine.execution.KerberosSupport; import com.odysseusinc.arachne.executionengine.execution.Overseer; import com.odysseusinc.arachne.executionengine.service.ConnectionPoolService; -import com.odysseusinc.datasourcemanager.krblogin.KrbConfig; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -21,10 +20,12 @@ import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.file.Path; import java.sql.Connection; import java.sql.SQLException; import java.time.Instant; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; @@ -55,14 +56,11 @@ public abstract class RService { private static final String RUNTIME_ENV_LC_ALL_KEY = "LC_ALL"; private static final String RUNTIME_ENV_LC_ALL_VALUE = "en_US.UTF-8"; private static final String RUNTIME_ENV_DRIVER_PATH = "JDBC_DRIVER_PATH"; - private static final String RUNTIME_BQ_KEYFILE = "BQ_KEYFILE"; @Value("${runtime.killTimeoutSec:30}") protected int killTimeoutSec; @Value("${runtime.timeOutSec}") protected int runtimeTimeOutSec; - @Autowired - private KerberosSupport kerberosSupport; @Value("${bulkload.enableMPP}") private boolean enableMPP; @Autowired @@ -71,6 +69,8 @@ public abstract class RService { private ConnectionPoolService poolService; @Autowired private DriverLocations drivers; + @Autowired + private List credentialProviders; private static String sanitizeFilename(String filename) { return Objects.requireNonNull(filename).replaceAll("[<>:\"/\\\\|?*\\u0000]", ""); @@ -90,31 +90,40 @@ public Overseer analyze(AnalysisSyncRequestDTO analysis, File analysisDir, BiCon return new FailedOverseer(Instant.now(), "Unable to resolve to [" + HOST_DOCKER_INTERNAL + "]", AnalysisRequestTypeDTO.R, e); } } + + Path path = analysisDir.toPath(); + AuthEffects auth = credentialProviders.stream().map(provider -> + provider.apply(dataSource, path, path.toAbsolutePath().toString()) + ).filter(Objects::nonNull).findFirst().orElse(null); + + if (auth instanceof AuthEffects.ModifyUrl) { + String newUrl = ((AuthEffects.ModifyUrl) auth).getNewUrl(); + dataSource.setConnectionString(newUrl); + } + log.info("Execution [{}] checking connection to [{}]", id, dataSource.getConnectionString()); try (Connection conn = poolService.getDataSource(dataSource).getConnection()) { String name = conn.getMetaData().getDatabaseProductName(); log.info("Execution [{}] connection verified, engine: [{}]", id, name); - } catch (SQLException e) { + } catch (SQLException | UncheckedExecutionException e) { log.info("Execution [{}] connection verification failed [{}]", id, e.getMessage()); } - File keystoreDir = new File(analysisDir, "keys"); - KrbConfig krbConfig = kerberosSupport.getConfig(analysis, keystoreDir); - Overseer overseer = analyze(analysis, analysisDir, callback, updateInterval, krbConfig).whenComplete((outcome, throwable) -> { - // Keystore folder must be deleted before zipping results - FileUtils.deleteQuietly(keystoreDir); + Overseer overseer = analyze(analysis, analysisDir, callback, updateInterval, auth).whenComplete((outcome, throwable) -> { + if (auth instanceof AuthEffects.Cleanup) { + ((AuthEffects.Cleanup) auth).cleanup(); + } }); log.info("Execution [{}] started in R Runtime Service", analysis.getId()); return overseer; } - protected abstract Overseer analyze(AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer callback, Integer updateInterval, KrbConfig krbConfig); - - protected Map buildRuntimeEnvVariables(DataSourceUnsecuredDTO dataSource, Map krbProps) { + protected abstract Overseer analyze(AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer callback, Integer updateInterval, AuthEffects auth); - Map environment = new HashMap<>(krbProps); + protected Map buildRuntimeEnvVariables(DataSourceUnsecuredDTO dataSource, AuthEffects auth) { + Map environment = new HashMap<>(); environment.put(RUNTIME_ENV_DATA_SOURCE_NAME, RService.sanitizeFilename(dataSource.getName())); environment.put(RUNTIME_ENV_DBMS_USERNAME, dataSource.getUsername()); environment.put(RUNTIME_ENV_DBMS_PASSWORD, dataSource.getPassword()); @@ -125,12 +134,14 @@ protected Map buildRuntimeEnvVariables(DataSourceUnsecuredDTO da environment.put(RUNTIME_ENV_RESULT_SCHEMA, dataSource.getResultSchema()); environment.put(RUNTIME_ENV_COHORT_TARGET_TABLE, dataSource.getCohortTargetTable()); environment.put(RUNTIME_ENV_DRIVER_PATH, drivers.getPath(dataSource.getType())); - environment.put(RUNTIME_BQ_KEYFILE, getBigQueryKeyFile(dataSource)); environment.put(RUNTIME_ENV_PATH_KEY, RUNTIME_ENV_PATH_VALUE); environment.put(RUNTIME_ENV_HOME_KEY, getUserHome()); environment.put(RUNTIME_ENV_HOSTNAME_KEY, RUNTIME_ENV_HOSTNAME_VALUE); environment.put(RUNTIME_ENV_LANG_KEY, RUNTIME_ENV_LANG_VALUE); environment.put(RUNTIME_ENV_LC_ALL_KEY, RUNTIME_ENV_LC_ALL_VALUE); + if (auth instanceof AddEnvironmentVariables) { + environment.putAll(((AddEnvironmentVariables) auth).getEnvVars()); + } if (enableMPP) { exposeMPPEnvironmentVariables(environment); @@ -161,9 +172,4 @@ private String getUserHome() { return StringUtils.defaultString(userHome, RUNTIME_ENV_HOME_VALUE); } - private String getBigQueryKeyFile(DataSourceUnsecuredDTO dataSource) { - - return dataSource.getType().equals(DBMSType.BIGQUERY) ? - BigQueryUtils.getBigQueryKeyPath(dataSource.getConnectionString()) : null; - } } diff --git a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/TarballRService.java b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/TarballRService.java index e15c39e..5bad217 100644 --- a/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/TarballRService.java +++ b/engine/src/main/java/com/odysseusinc/arachne/executionengine/execution/r/TarballRService.java @@ -25,6 +25,7 @@ import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome; +import com.odysseusinc.arachne.executionengine.auth.AuthEffects; import com.odysseusinc.arachne.executionengine.config.runtimeservice.RIsolatedRuntimeProperties; import com.odysseusinc.arachne.executionengine.execution.Overseer; import com.odysseusinc.arachne.executionengine.model.descriptor.Descriptor; @@ -33,7 +34,6 @@ import com.odysseusinc.arachne.executionengine.model.descriptor.r.RDependency; import com.odysseusinc.arachne.executionengine.model.descriptor.r.RExecutionRuntime; import com.odysseusinc.arachne.executionengine.service.DescriptorService; -import com.odysseusinc.datasourcemanager.krblogin.KrbConfig; import com.odysseusinc.datasourcemanager.krblogin.RuntimeServiceMode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -91,7 +91,7 @@ public void init() { } @Override - protected Overseer analyze(AnalysisSyncRequestDTO analysis, File file, BiConsumer callback, Integer updateInterval, KrbConfig krbConfig) { + protected Overseer analyze(AnalysisSyncRequestDTO analysis, File file, BiConsumer callback, Integer updateInterval, AuthEffects authEffects) { DescriptorBundle descriptorBundle = descriptorService.getDescriptorBundle( file, analysis.getId(), analysis.getRequestedDescriptorId() ); @@ -101,7 +101,7 @@ protected Overseer analyze(AnalysisSyncRequestDTO analysis, File file, BiConsume try { Instant started = Instant.now(); - Map envp = buildRuntimeEnvVariables(dataSource, krbConfig.getIsolatedRuntimeEnvs()); + Map envp = buildRuntimeEnvVariables(dataSource, authEffects); File jailFile = new File(rIsolatedRuntimeProps.getJailSh()); boolean externalJail = jailFile.isFile(); File runFile = externalJail ? jailFile : extractToTempFile(resourceLoader, "classpath:/jail.sh", "ee", ".sh"); @@ -121,10 +121,6 @@ protected Overseer analyze(AnalysisSyncRequestDTO analysis, File file, BiConsume if (!externalJail) { FileUtils.deleteQuietly(runFile); } - FileUtils.deleteQuietly(krbConfig.getComponents().getKeytabPath().toFile()); - if (RuntimeServiceMode.ISOLATED == getRuntimeServiceMode()) { - FileUtils.deleteQuietly(krbConfig.getConfPath().toFile()); - } cleanupEnv(file, outcome); });