Skip to content

Commit

Permalink
Merge pull request #400 from OHDSI/bigquery
Browse files Browse the repository at this point in the history
Reinstate support for BigQuery
  • Loading branch information
dmitrys-odysseus authored Oct 8, 2024
2 parents 30eedd9 + 63b9203 commit cc5549e
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 73 deletions.
6 changes: 6 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@
<google.gax.version>2.13.0</google.gax.version>
<proto.google.cloud.bigquerystorage.v1.version>2.12.0</proto.google.cloud.bigquerystorage.v1.version>
<opencensus.contrib.http.util.version>0.31.0</opencensus.contrib.http.util.version>
<io.grpc.grpc-core.version>1.45.0</io.grpc.grpc-core.version>
<!-- BigQuery JDBC driver path -->
<bigquery.classpath>/bigquery</bigquery.classpath>
</properties>
Expand Down Expand Up @@ -428,6 +429,11 @@
<artifactId>opencensus-contrib-http-util</artifactId>
<version>${opencensus.contrib.http.util.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>${io.grpc.grpc-core.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
5 changes: 1 addition & 4 deletions engine/src/main/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM eclipse-temurin:8-jdk
FROM eclipse-temurin:8-jre
LABEL maintainer="Odysseus Data Services, Inc: [email protected]"
VOLUME /tmp
VOLUME /libs-r
Expand All @@ -10,9 +10,6 @@ RUN ln -fs /usr/share/zoneinfo/Etc/GMT /etc/localtime && dpkg-reconfigure -f non
RUN update-rc.d cron defaults && systemctl enable rsyslog
ENV R_INSTALL_SCRIPT=libs.r
ADD execution-engine-exec.jar /execution-engine.jar
RUN mkdir /impala
ADD Cloudera_ImpalaJDBC4_2.5.41.zip /impala/impala-jdbc.zip
RUN cd /impala && unzip impala-jdbc.zip && rm -f impala-jdbc.zip
RUN mkdir /runtimes
COPY descriptor_base.json /runtimes/descriptor_base.json
RUN ls -la /runtimes/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.odysseusinc.arachne.executionengine.auth;

import java.util.Map;

public interface AuthEffects {
interface AddEnvironmentVariables extends AuthEffects {
Map<String, String> getEnvVars();
}

interface ModifyUrl extends AuthEffects {
String getNewUrl();
}

interface Cleanup extends AuthEffects {
void cleanup();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.odysseusinc.arachne.executionengine.auth;

import com.google.common.collect.ImmutableMap;
import com.odysseusinc.arachne.commons.types.DBMSType;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;

@Service
@Slf4j
public class BigQueryCredentialsProvider implements CredentialsProvider {
private static final String RUNTIME_BQ_KEYFILE = "BQ_KEYFILE";

@Override
public AuthEffects apply(DataSourceUnsecuredDTO dataSource, Path analysisDir, String analysisDirInContainer) {
if (dataSource.getType() == DBMSType.BIGQUERY) {
try {
Path keyFile = Files.createTempFile(analysisDir, "", ".json").toAbsolutePath();
try (OutputStream out = Files.newOutputStream(keyFile)) {
IOUtils.write(dataSource.getKeyfile(), out);
}
Path innerPath = Paths.get(analysisDirInContainer).resolve(keyFile.getFileName());
return new Effects(dataSource.getConnectionString(), keyFile, innerPath.toString());
} catch (IOException e) {
log.error("Failed to resolve BigQuery authentication for Source: [{}]", dataSource.getName(), e);
throw new RuntimeException(e);
}
} else {
return null;
}
}

@AllArgsConstructor
private static class Effects implements AuthEffects, AuthEffects.AddEnvironmentVariables, AuthEffects.ModifyUrl, AuthEffects.Cleanup {
private final String newUrl;
private final Path keyFile;
private final String keyFileInContainer;

@Override
public Map<String, String> getEnvVars() {
return ImmutableMap.of(RUNTIME_BQ_KEYFILE, keyFileInContainer);
}

@Override
public String getNewUrl() {
return newUrl + ";OAuthPvtKeyPath=" + keyFileInContainer + ";";
}

@Override
public void cleanup() {
FileUtils.deleteQuietly(keyFile.toFile());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.odysseusinc.arachne.executionengine.auth;

import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO;

import java.nio.file.Path;

public interface CredentialsProvider {
AuthEffects apply(DataSourceUnsecuredDTO dataSource, Path analysisDir, String analysisDirInContainer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.odysseusinc.arachne.executionengine.auth;

import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.DataSourceUnsecuredDTO;
import com.odysseusinc.datasourcemanager.krblogin.KerberosService;
import com.odysseusinc.datasourcemanager.krblogin.KrbConfig;
import com.odysseusinc.datasourcemanager.krblogin.RuntimeServiceMode;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;

@Service
@Slf4j
public class KerberosCredentialsProvider implements CredentialsProvider {
@Autowired
private KerberosService kerberosService;

@Override
public AuthEffects apply(DataSourceUnsecuredDTO ds, Path analysisDir, String analysisDirInContainer) {
Path keys = analysisDir.resolve("keys");
File keystoreDir = keys.toFile();

if (ds.getUseKerberos()) {
keystoreDir.mkdirs();
try {
KrbConfig config = kerberosService.runKinit(ds, RuntimeServiceMode.SINGLE, keystoreDir);
return new Effects(config, keys);
} catch (IOException e) {
log.error("Failed to resolve Kerberos auth for Datasource: {}", ds.getName(), e);
throw new RuntimeException(e);
}
} else {
return null;
}
}

@AllArgsConstructor
private static class Effects implements AuthEffects, AuthEffects.AddEnvironmentVariables, AuthEffects.Cleanup {
private final KrbConfig config;
private final Path keyFile;

@Override
public Map<String, String> getEnvVars() {
return config.getIsolatedRuntimeEnvs();
}

@Override
public void cleanup() {
FileUtils.deleteQuietly(keyFile.toFile());
FileUtils.deleteQuietly(config.getComponents().getKeytabPath().toFile());
FileUtils.deleteQuietly(config.getConfPath().toFile());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package com.odysseusinc.arachne.executionengine.config;

import com.odysseusinc.datasourcemanager.jdbc.auth.BigQueryAuthResolver;
import com.odysseusinc.datasourcemanager.krblogin.KerberosService;
import com.odysseusinc.datasourcemanager.krblogin.KerberosServiceImpl;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -69,4 +70,9 @@ public KerberosService kerberosService() {
return new KerberosServiceImpl(timeout, kinitPath, configPath);
}

@Bean
public BigQueryAuthResolver bigQueryAuthResolver() {
return new BigQueryAuthResolver();
};

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisSyncRequestDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage;
import com.odysseusinc.arachne.execution_engine_common.descriptor.dto.DockerEnvironmentDTO;
import com.odysseusinc.arachne.executionengine.auth.AuthEffects;
import com.odysseusinc.arachne.executionengine.config.properties.DockerRegistryProperties;
import com.odysseusinc.arachne.executionengine.execution.Overseer;
import com.odysseusinc.arachne.executionengine.util.Streams;
import com.odysseusinc.datasourcemanager.krblogin.KrbConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -95,9 +95,9 @@ public DockerService(DockerRegistryProperties properties) {

@Override
protected Overseer analyze(
AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer<String, String> sendCallback, Integer updateInterval, KrbConfig krbConfig
AnalysisSyncRequestDTO analysis, File analysisDir, BiConsumer<String, String> sendCallback, Integer updateInterval, AuthEffects auth
) {
List<String> env = buildRuntimeEnvVariables(analysis.getDataSource(), krbConfig.getIsolatedRuntimeEnvs()).entrySet().stream().map(entry ->
List<String> env = buildRuntimeEnvVariables(analysis.getDataSource(), auth).entrySet().stream().map(entry ->
entry.getKey() + "=" + entry.getValue()).collect(Collectors.toList()
);
Instant started = Instant.now();
Expand Down
Loading

0 comments on commit cc5549e

Please sign in to comment.