diff --git a/.dockerignore b/.dockerignore index 820435ad6acc..cc026d5dc6c1 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,7 @@ .dockerignore .git .idea +.gradle **/build **/node_modules Dockerfile.* diff --git a/.env b/.env index 9b716bea4036..f531bf5d9a2e 100644 --- a/.env +++ b/.env @@ -4,4 +4,5 @@ DATABASE_USER=docker DATABASE_PASSWORD=docker DATABASE_DB=dataline DATABASE_URL=jdbc:postgresql://db:5432/dataline -CONFIG_PERSISTENCE_ROOT=data/config +CONFIG_ROOT=data/config +WORKSPACE_ROOT=/tmp/workspace diff --git a/Dockerfile.build b/Dockerfile.build index 235c67b7422b..c55a5f96be11 100644 --- a/Dockerfile.build +++ b/Dockerfile.build @@ -3,6 +3,10 @@ ##################### FROM ubuntu:20.04 AS build-base +WORKDIR /code + +ENV DEBIAN_FRONTEND noninteractive + # Install tools RUN apt-get update && apt-get -y install curl @@ -10,7 +14,8 @@ RUN apt-get update && apt-get -y install curl RUN curl -sL https://deb.nodesource.com/setup_14.x | bash - RUN apt-get update && apt-get -y install \ nodejs \ - openjdk-14-jdk + openjdk-14-jdk \ + docker.io ####################### # Prepare project env # @@ -29,6 +34,8 @@ RUN ./gradlew build --no-daemon -g /home/gradle/.gradle ################### FROM build-project AS build +WORKDIR /code + # Copy code, etc. COPY . /code diff --git a/dataline-config-persistence/src/main/java/io/dataline/config/persistence/DefaultConfigPersistence.java b/dataline-config-persistence/src/main/java/io/dataline/config/persistence/DefaultConfigPersistence.java index 9d24ba2cb05c..20fa0195a4eb 100644 --- a/dataline-config-persistence/src/main/java/io/dataline/config/persistence/DefaultConfigPersistence.java +++ b/dataline-config-persistence/src/main/java/io/dataline/config/persistence/DefaultConfigPersistence.java @@ -43,13 +43,14 @@ // we force all interaction with disk storage to be effectively single threaded. public class DefaultConfigPersistence implements ConfigPersistence { + private static final Object lock = new Object(); private final ObjectMapper objectMapper; private final JsonSchemaValidation jsonSchemaValidation; - private final String storageRoot; + private final Path storageRoot; - public DefaultConfigPersistence(String storageRoot) { + public DefaultConfigPersistence(Path storageRoot) { this.storageRoot = storageRoot; jsonSchemaValidation = new JsonSchemaValidation(); objectMapper = new ObjectMapper(); @@ -145,7 +146,7 @@ private Set getFiles(PersistenceConfigType persistenceConfigType) { } private Path getConfigDirectory(PersistenceConfigType persistenceConfigType) { - return Path.of(storageRoot).resolve(persistenceConfigType.toString()); + return storageRoot.resolve(persistenceConfigType.toString()); } private Path getConfigPath(PersistenceConfigType persistenceConfigType, String configId) { diff --git a/dataline-config-persistence/src/test/java/io/dataline/config/persistence/DefaultConfigPersistenceTest.java b/dataline-config-persistence/src/test/java/io/dataline/config/persistence/DefaultConfigPersistenceTest.java index b454fd98e813..ec83c9373062 100644 --- a/dataline-config-persistence/src/test/java/io/dataline/config/persistence/DefaultConfigPersistenceTest.java +++ b/dataline-config-persistence/src/test/java/io/dataline/config/persistence/DefaultConfigPersistenceTest.java @@ -47,7 +47,7 @@ class DefaultConfigPersistenceTest { @BeforeEach void setUp() throws IOException { rootPath = Files.createTempDirectory(DefaultConfigPersistenceTest.class.getName()); - configPersistence = new DefaultConfigPersistence(rootPath.toString()); + configPersistence = new DefaultConfigPersistence(rootPath); } private StandardSource generateStandardSource() { diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java b/dataline-config/src/main/java/io/dataline/config/Configs.java similarity index 76% rename from dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java rename to dataline-config/src/main/java/io/dataline/config/Configs.java index 8da229a0cd54..679cd9633ed6 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java +++ b/dataline-config/src/main/java/io/dataline/config/Configs.java @@ -22,14 +22,17 @@ * SOFTWARE. */ -package io.dataline.workers.singer.postgres_tap; +package io.dataline.config; -import io.dataline.integrations.Integrations; -import io.dataline.workers.singer.SingerDiscoverSchemaWorker; +import java.nio.file.Path; -public class SingerPostgresTapDiscoverWorker extends SingerDiscoverSchemaWorker { +public interface Configs { - public SingerPostgresTapDiscoverWorker() { - super(Integrations.POSTGRES_TAP.getSyncImage()); - } + Path getConfigRoot(); + + Path getWorkspaceRoot(); + + String getWorkspaceDockerMount(); + + String getDockerNetwork(); } diff --git a/dataline-config/src/main/java/io/dataline/config/EnvConfigs.java b/dataline-config/src/main/java/io/dataline/config/EnvConfigs.java new file mode 100644 index 000000000000..3b261bd17ac9 --- /dev/null +++ b/dataline-config/src/main/java/io/dataline/config/EnvConfigs.java @@ -0,0 +1,93 @@ +/* + * MIT License + * + * Copyright (c) 2020 Dataline + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.dataline.config; + +import java.nio.file.Path; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EnvConfigs implements Configs { + + private static final Logger LOGGER = LoggerFactory.getLogger(EnvConfigs.class); + + public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT"; + public static final String WORKSPACE_DOCKER_MOUNT = "WORKSPACE_DOCKER_MOUNT"; + public static final String CONFIG_ROOT = "CONFIG_ROOT"; + public static final String DOCKER_NETWORK = "DOCKER_NETWORK"; + + public static final String DEFAULT_NETWORK = "host"; + + private final Function getEnv; + + public EnvConfigs() { + this(System::getenv); + } + + EnvConfigs(final Function getEnv) { + this.getEnv = getEnv; + } + + @Override + public Path getConfigRoot() { + return getPath(CONFIG_ROOT); + } + + @Override + public Path getWorkspaceRoot() { + return getPath(WORKSPACE_ROOT); + } + + @Override + public String getWorkspaceDockerMount() { + final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT); + + if (mount != null) { + return mount; + } + + LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT); + return getWorkspaceRoot().toString(); + } + + @Override + public String getDockerNetwork() { + final String network = getEnv.apply(DOCKER_NETWORK); + if (network != null) { + return network; + } + + LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK); + return DEFAULT_NETWORK; + } + + private Path getPath(final String name) { + final String value = getEnv.apply(name); + if (value == null) { + throw new IllegalArgumentException("Env variable not defined: " + name); + } + return Path.of(value); + } +} diff --git a/dataline-config/src/test/java/io/dataline/config/EnvConfigsTest.java b/dataline-config/src/test/java/io/dataline/config/EnvConfigsTest.java new file mode 100644 index 000000000000..a3823ca75cfc --- /dev/null +++ b/dataline-config/src/test/java/io/dataline/config/EnvConfigsTest.java @@ -0,0 +1,93 @@ +/* + * MIT License + * + * Copyright (c) 2020 Dataline + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.dataline.config; + +import static org.mockito.Mockito.when; + +import java.nio.file.Paths; +import java.util.function.Function; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class EnvConfigsTest { + + private Function function; + private EnvConfigs config; + + @BeforeEach + void setUp() { + function = Mockito.mock(Function.class); + config = new EnvConfigs(function); + } + + @Test + void ensureGetEnvBehavior() { + Assertions.assertNull(System.getenv("MY_RANDOM_VAR_1234")); + } + + @Test + void testWorkspaceRoot() { + when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null); + Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceRoot()); + + when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def"); + Assertions.assertEquals(Paths.get("abc/def"), config.getWorkspaceRoot()); + } + + @Test + void testConfigRoot() { + when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn(null); + Assertions.assertThrows(IllegalArgumentException.class, () -> config.getConfigRoot()); + + when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn("a/b"); + Assertions.assertEquals(Paths.get("a/b"), config.getConfigRoot()); + } + + @Test + void testGetDockerMount() { + when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null); + when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def"); + Assertions.assertEquals("abc/def", config.getWorkspaceDockerMount()); + + when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn("root"); + when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null); + Assertions.assertEquals("root", config.getWorkspaceDockerMount()); + + when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null); + when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null); + Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceDockerMount()); + } + + @Test + void testDockerNetwork() { + when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn(null); + Assertions.assertEquals("host", config.getDockerNetwork()); + + when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn("abc"); + Assertions.assertEquals("abc", config.getDockerNetwork()); + } +} diff --git a/dataline-integrations/singer/csv/destination/Dockerfile b/dataline-integrations/singer/csv/destination/Dockerfile index f126fa501823..a4b66ff3d330 100644 --- a/dataline-integrations/singer/csv/destination/Dockerfile +++ b/dataline-integrations/singer/csv/destination/Dockerfile @@ -11,6 +11,4 @@ COPY requirements.txt . RUN python -m pip install --upgrade pip && \ pip install -r requirements.txt -WORKDIR /singer/data - ENTRYPOINT ["target-csv"] diff --git a/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile b/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile index 8bde97fc6133..1c4fe29b0559 100644 --- a/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile +++ b/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile @@ -11,6 +11,4 @@ COPY requirements.txt . RUN python -m pip install --upgrade pip && \ pip install -r requirements.txt -WORKDIR /singer/data - ENTRYPOINT ["tap-exchangeratesapi"] diff --git a/dataline-integrations/singer/postgres/destination/Dockerfile b/dataline-integrations/singer/postgres/destination/Dockerfile index d2e1b9df9a40..b340b130fc9c 100644 --- a/dataline-integrations/singer/postgres/destination/Dockerfile +++ b/dataline-integrations/singer/postgres/destination/Dockerfile @@ -17,5 +17,4 @@ RUN apt-get update && \ python -m pip install --upgrade pip && \ pip install -r requirements.txt -WORKDIR /singer/data ENTRYPOINT ["target-postgres"] diff --git a/dataline-integrations/singer/postgres/source/Dockerfile b/dataline-integrations/singer/postgres/source/Dockerfile index df075e01d9e6..948052a30ede 100644 --- a/dataline-integrations/singer/postgres/source/Dockerfile +++ b/dataline-integrations/singer/postgres/source/Dockerfile @@ -17,6 +17,4 @@ RUN python -m pip install --upgrade pip && \ RUN apt-get autoremove -y gcc -WORKDIR /singer/data - ENTRYPOINT ["tap-postgres"] diff --git a/dataline-scheduler/build.gradle b/dataline-scheduler/build.gradle index 6e176b6d2839..3bfb00fa2ab3 100644 --- a/dataline-scheduler/build.gradle +++ b/dataline-scheduler/build.gradle @@ -18,7 +18,7 @@ application { run { // default for running on local machine. - environment "CONFIG_PERSISTENCE_ROOT", new File(".").absolutePath + "data/config" + environment "CONFIG_ROOT", new File(".").absolutePath + "data/config" environment "VERSION", "0.1.0" environment "DATABASE_USER", "postgres" environment "DATABASE_PASSWORD", "" diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java index 99f80406a2e4..22c0dc654b83 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobScheduler.java @@ -132,7 +132,7 @@ private static Long getSecondsInUnit(Schedule.TimeUnit timeUnitEnum) { } private static Long getIntervalInSeconds(Schedule schedule) { - return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits().longValue(); + return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits(); } private Set getAllActiveConnections() { diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java index 12329a633518..7bebe87215d6 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java @@ -25,6 +25,8 @@ package io.dataline.scheduler; import io.dataline.db.DatabaseHelper; +import io.dataline.workers.process.ProcessBuilderFactory; +import java.nio.file.Path; import java.sql.SQLException; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -40,14 +42,20 @@ public class JobSubmitter implements Runnable { private final ExecutorService threadPool; private final BasicDataSource connectionPool; private final SchedulerPersistence persistence; + private final Path workspaceRoot; + private final ProcessBuilderFactory pbf; public JobSubmitter( - ExecutorService threadPool, - BasicDataSource connectionPool, - SchedulerPersistence persistence) { + final ExecutorService threadPool, + final BasicDataSource connectionPool, + final SchedulerPersistence persistence, + final Path workspaceRoot, + final ProcessBuilderFactory pbf) { this.threadPool = threadPool; this.connectionPool = connectionPool; this.persistence = persistence; + this.workspaceRoot = workspaceRoot; + this.pbf = pbf; } @Override @@ -86,6 +94,7 @@ private Optional getOldestPendingJob() throws SQLException { } private void submitJob(Job job) { - threadPool.submit(new WorkerRunner(job.getId(), connectionPool, persistence)); + threadPool.submit( + new WorkerRunner(job.getId(), connectionPool, persistence, workspaceRoot, pbf)); } } diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java index f676e136aa3c..307ab9e8df7a 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/SchedulerApp.java @@ -25,9 +25,14 @@ package io.dataline.scheduler; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.dataline.config.Configs; +import io.dataline.config.EnvConfigs; import io.dataline.config.persistence.ConfigPersistence; import io.dataline.config.persistence.DefaultConfigPersistence; import io.dataline.db.DatabaseHelper; +import io.dataline.workers.process.DockerProcessBuilderFactory; +import io.dataline.workers.process.ProcessBuilderFactory; +import java.nio.file.Path; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -44,6 +49,7 @@ * launching new jobs. */ public class SchedulerApp { + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final int MAX_WORKERS = 4; @@ -52,23 +58,32 @@ public class SchedulerApp { new ThreadFactoryBuilder().setNameFormat("scheduler-%d").build(); private final BasicDataSource connectionPool; - private final String configPersistenceRoot; + private final Path configRoot; + private final Path workspaceRoot; + private final ProcessBuilderFactory pbf; - public SchedulerApp(BasicDataSource connectionPool, String configPersistenceRoot) { + public SchedulerApp( + BasicDataSource connectionPool, + Path configRoot, + Path workspaceRoot, + ProcessBuilderFactory pbf) { this.connectionPool = connectionPool; - this.configPersistenceRoot = configPersistenceRoot; + this.configRoot = configRoot; + this.workspaceRoot = workspaceRoot; + this.pbf = pbf; } public void start() { final SchedulerPersistence schedulerPersistence = new DefaultSchedulerPersistence(connectionPool); - final ConfigPersistence configPersistence = new DefaultConfigPersistence(configPersistenceRoot); + final ConfigPersistence configPersistence = new DefaultConfigPersistence(configRoot); final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY); final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); final JobSubmitter jobSubmitter = - new JobSubmitter(workerThreadPool, connectionPool, schedulerPersistence); + new JobSubmitter( + workerThreadPool, connectionPool, schedulerPersistence, workspaceRoot, pbf); final JobScheduler jobScheduler = new JobScheduler(connectionPool, schedulerPersistence, configPersistence); @@ -86,13 +101,22 @@ public void start() { } public static void main(String[] args) { - final String configPersistenceRoot = System.getenv("CONFIG_PERSISTENCE_ROOT"); - LOGGER.info("configPersistenceRoot = " + configPersistenceRoot); + final Configs configs = new EnvConfigs(); + + final Path configRoot = configs.getConfigRoot(); + LOGGER.info("configRoot = " + configRoot); + + final Path workspaceRoot = configs.getWorkspaceRoot(); + LOGGER.info("workspaceRoot = " + workspaceRoot); LOGGER.info("Creating DB connection pool..."); - BasicDataSource connectionPool = DatabaseHelper.getConnectionPoolFromEnv(); + final BasicDataSource connectionPool = DatabaseHelper.getConnectionPoolFromEnv(); + + final ProcessBuilderFactory pbf = + new DockerProcessBuilderFactory( + workspaceRoot, configs.getWorkspaceDockerMount(), configs.getDockerNetwork()); LOGGER.info("Launching scheduler..."); - new SchedulerApp(connectionPool, configPersistenceRoot).start(); + new SchedulerApp(connectionPool, configRoot, workspaceRoot, pbf).start(); } } diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRun.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRun.java index fe4db5a65bd3..f0d07791900b 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRun.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRun.java @@ -29,13 +29,13 @@ import io.dataline.db.DatabaseHelper; import io.dataline.workers.OutputAndStatus; import io.dataline.workers.Worker; +import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,16 +57,19 @@ public class WorkerRun implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRun.class); private final long jobId; + private final Path jobRoot; private final InputType input; private final Worker worker; private final BasicDataSource connectionPool; public WorkerRun( long jobId, + Path jobRoot, InputType input, Worker worker, BasicDataSource connectionPool) { this.jobId = jobId; + this.jobRoot = jobRoot; this.input = input; this.worker = worker; this.connectionPool = connectionPool; @@ -78,13 +81,9 @@ public void run() { try { setJobStatus(connectionPool, jobId, JobStatus.RUNNING); - // todo (cgardens) - replace this with whatever the correct path is. probably dependency - // inject it based via env. - final Path workspacesRoot = Path.of("/tmp/dataline/workspaces/"); - FileUtils.forceMkdir(workspacesRoot.toFile()); - final Path workspaceRoot = workspacesRoot.resolve(String.valueOf(jobId)); - FileUtils.forceMkdir(workspaceRoot.toFile()); - OutputAndStatus outputAndStatus = worker.run(input, workspaceRoot); + Files.createDirectories(jobRoot); + + OutputAndStatus outputAndStatus = worker.run(input, jobRoot); switch (outputAndStatus.getStatus()) { case FAILED: diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRunner.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRunner.java index e612dc06901c..e6e0f6b343b9 100644 --- a/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRunner.java +++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerRunner.java @@ -31,11 +31,13 @@ import io.dataline.config.StandardDiscoverSchemaInput; import io.dataline.config.StandardSyncInput; import io.dataline.workers.DefaultSyncWorker; +import io.dataline.workers.process.ProcessBuilderFactory; import io.dataline.workers.singer.SingerCheckConnectionWorker; import io.dataline.workers.singer.SingerDiscoverSchemaWorker; import io.dataline.workers.singer.SingerTapFactory; import io.dataline.workers.singer.SingerTargetFactory; import java.io.IOException; +import java.nio.file.Path; import org.apache.commons.dbcp2.BasicDataSource; /** @@ -43,15 +45,24 @@ * appropriate worker for a given job. */ public class WorkerRunner implements Runnable { + private final long jobId; private final BasicDataSource connectionPool; private final SchedulerPersistence persistence; + private final Path workspaceRoot; + private final ProcessBuilderFactory pbf; public WorkerRunner( - long jobId, BasicDataSource connectionPool, SchedulerPersistence persistence) { + long jobId, + BasicDataSource connectionPool, + SchedulerPersistence persistence, + Path workspaceRoot, + ProcessBuilderFactory pbf) { this.jobId = jobId; this.connectionPool = connectionPool; this.persistence = persistence; + this.workspaceRoot = workspaceRoot; + this.pbf = pbf; } @Override @@ -63,6 +74,8 @@ public void run() { throw new RuntimeException(e); } + final Path jobRoot = workspaceRoot.resolve(String.valueOf(jobId)); + switch (job.getConfig().getConfigType()) { case CHECK_CONNECTION_SOURCE: case CHECK_CONNECTION_DESTINATION: @@ -70,9 +83,10 @@ public void run() { getCheckConnectionInput(job.getConfig().getCheckConnection()); new WorkerRun<>( jobId, + jobRoot, checkConnectionInput, new SingerCheckConnectionWorker( - job.getConfig().getCheckConnection().getDockerImage()), + job.getConfig().getCheckConnection().getDockerImage(), pbf), connectionPool) .run(); break; @@ -81,9 +95,10 @@ public void run() { getDiscoverSchemaInput(job.getConfig().getDiscoverSchema()); new WorkerRun<>( jobId, + jobRoot, discoverSchemaInput, new SingerDiscoverSchemaWorker( - job.getConfig().getDiscoverSchema().getDockerImage()), + job.getConfig().getDiscoverSchema().getDockerImage(), pbf), connectionPool) .run(); break; @@ -91,14 +106,16 @@ public void run() { final StandardSyncInput syncInput = getSyncInput(job.getConfig().getSync()); new WorkerRun<>( jobId, + jobRoot, syncInput, // todo (cgardens) - still locked into only using SingerTaps and Targets. Next step // here is to create DefaultTap and DefaultTarget which will be able to // interoperate with SingerTap and SingerTarget now that they are split and // mediated in DefaultSyncWorker. new DefaultSyncWorker( - new SingerTapFactory(job.getConfig().getSync().getSourceDockerImage()), - new SingerTargetFactory(job.getConfig().getSync().getDestinationDockerImage())), + new SingerTapFactory(job.getConfig().getSync().getSourceDockerImage(), pbf), + new SingerTargetFactory( + job.getConfig().getSync().getDestinationDockerImage(), pbf)), connectionPool) .run(); break; diff --git a/dataline-server/build.gradle b/dataline-server/build.gradle index a1a381c25f18..c81efd580e1a 100644 --- a/dataline-server/build.gradle +++ b/dataline-server/build.gradle @@ -30,7 +30,7 @@ application { run { // default for running on local machine. - environment "CONFIG_PERSISTENCE_ROOT", new File(".").absolutePath + "data/config" + environment "CONFIG_ROOT", new File(".").absolutePath + "data/config" environment "VERSION", "0.1.0" environment "DATABASE_USER", "postgres" environment "DATABASE_PASSWORD", "" diff --git a/dataline-server/src/main/java/io/dataline/server/ConfigurationApiFactory.java b/dataline-server/src/main/java/io/dataline/server/ConfigurationApiFactory.java index b296e75e3193..d479c84f1adf 100644 --- a/dataline-server/src/main/java/io/dataline/server/ConfigurationApiFactory.java +++ b/dataline-server/src/main/java/io/dataline/server/ConfigurationApiFactory.java @@ -25,18 +25,20 @@ package io.dataline.server; import io.dataline.server.apis.ConfigurationApi; +import java.nio.file.Path; import org.apache.commons.dbcp2.BasicDataSource; import org.glassfish.hk2.api.Factory; public class ConfigurationApiFactory implements Factory { - private static String dbRoot; + + private static Path dbRoot; private static BasicDataSource connectionPool; - public static void setConfigPersistenceRoot(String dbRoot) { + public static void setConfigPersistenceRoot(final Path dbRoot) { ConfigurationApiFactory.dbRoot = dbRoot; } - public static void setDbConnectionPool(BasicDataSource connectionPool) { + public static void setDbConnectionPool(final BasicDataSource connectionPool) { ConfigurationApiFactory.connectionPool = connectionPool; } diff --git a/dataline-server/src/main/java/io/dataline/server/ServerApp.java b/dataline-server/src/main/java/io/dataline/server/ServerApp.java index 95d227d5edfe..988bd7ec5d58 100644 --- a/dataline-server/src/main/java/io/dataline/server/ServerApp.java +++ b/dataline-server/src/main/java/io/dataline/server/ServerApp.java @@ -24,6 +24,8 @@ package io.dataline.server; +import io.dataline.config.Configs; +import io.dataline.config.EnvConfigs; import io.dataline.db.DatabaseHelper; import io.dataline.server.apis.ConfigurationApi; import io.dataline.server.errors.InvalidInputExceptionMapper; @@ -31,6 +33,7 @@ import io.dataline.server.errors.InvalidJsonInputExceptionMapper; import io.dataline.server.errors.KnownExceptionMapper; import io.dataline.server.errors.UncaughtExceptionMapper; +import java.nio.file.Path; import java.util.logging.Level; import org.apache.commons.dbcp2.BasicDataSource; import org.eclipse.jetty.server.Server; @@ -46,12 +49,13 @@ import org.slf4j.LoggerFactory; public class ServerApp { + private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class); - private final String configPersistenceRoot; - public ServerApp(String configPersistenceRoot) { + private final Path configRoot; - this.configPersistenceRoot = configPersistenceRoot; + public ServerApp(final Path configRoot) { + this.configRoot = configRoot; } public void start() throws Exception { @@ -61,7 +65,7 @@ public void start() throws Exception { ServletContextHandler handler = new ServletContextHandler(); - ConfigurationApiFactory.setConfigPersistenceRoot(configPersistenceRoot); + ConfigurationApiFactory.setConfigPersistenceRoot(configRoot); ConfigurationApiFactory.setDbConnectionPool(connectionPool); ResourceConfig rc = @@ -110,10 +114,12 @@ public void configure() { } public static void main(String[] args) throws Exception { - final String configPersistenceRoot = System.getenv("CONFIG_PERSISTENCE_ROOT"); - LOGGER.info("configPersistenceRoot = " + configPersistenceRoot); + final Configs configs = new EnvConfigs(); + + final Path configRoot = configs.getConfigRoot(); + LOGGER.info("configRoot = " + configRoot); LOGGER.info("Starting server..."); - new ServerApp(configPersistenceRoot).start(); + new ServerApp(configRoot).start(); } } diff --git a/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java b/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java index 41c376819385..9c7eb3a5f193 100644 --- a/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java +++ b/dataline-server/src/main/java/io/dataline/server/apis/ConfigurationApi.java @@ -73,11 +73,11 @@ import io.dataline.server.handlers.SourcesHandler; import io.dataline.server.handlers.WorkspacesHandler; import io.dataline.server.validation.IntegrationSchemaValidation; +import java.nio.file.Path; import javax.validation.Valid; -import javax.ws.rs.Path; import org.apache.commons.dbcp2.BasicDataSource; -@Path("/v1") +@javax.ws.rs.Path("/v1") public class ConfigurationApi implements io.dataline.api.V1Api { private final WorkspacesHandler workspacesHandler; private final SourcesHandler sourcesHandler; @@ -90,7 +90,7 @@ public class ConfigurationApi implements io.dataline.api.V1Api { private final SchedulerHandler schedulerHandler; private final JobHistoryHandler jobHistoryHandler; - public ConfigurationApi(String dbRoot, BasicDataSource connectionPool) { + public ConfigurationApi(final Path dbRoot, BasicDataSource connectionPool) { ConfigPersistence configPersistence = new DefaultConfigPersistence(dbRoot); final IntegrationSchemaValidation integrationSchemaValidation = new IntegrationSchemaValidation(configPersistence); diff --git a/dataline-workers/src/main/java/io/dataline/workers/DefaultSyncWorker.java b/dataline-workers/src/main/java/io/dataline/workers/DefaultSyncWorker.java index ddbb22828432..7ee9de8c2e74 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/DefaultSyncWorker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/DefaultSyncWorker.java @@ -58,7 +58,7 @@ public DefaultSyncWorker( } @Override - public OutputAndStatus run(StandardSyncInput syncInput, Path workspacePath) { + public OutputAndStatus run(StandardSyncInput syncInput, Path jobRoot) { long startTime = System.currentTimeMillis(); final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput); @@ -67,17 +67,17 @@ public OutputAndStatus run(StandardSyncInput syncInput, Path final SingerMessageTracker singerMessageTracker = new SingerMessageTracker(syncInput.getStandardSync().getConnectionId()); - try (Stream tap = singerTapFactory.create(tapConfig, workspacePath); + try (Stream tap = singerTapFactory.create(tapConfig, jobRoot); CloseableConsumer consumer = - singerTargetFactory.create(targetConfig, workspacePath)) { + singerTargetFactory.create(targetConfig, jobRoot)) { tap.takeWhile(record -> !cancelled.get()).peek(singerMessageTracker).forEach(consumer); } catch (Exception e) { LOGGER.debug( "Sync worker failed. Tap error log: {}.\n Target error log: {}", - WorkerUtils.readFileFromWorkspace(workspacePath, TAP_ERR_LOG), - WorkerUtils.readFileFromWorkspace(workspacePath, TARGET_ERR_LOG)); + WorkerUtils.readFileFromWorkspace(jobRoot, TAP_ERR_LOG), + WorkerUtils.readFileFromWorkspace(jobRoot, TARGET_ERR_LOG)); return new OutputAndStatus<>(JobStatus.FAILED, null); } diff --git a/dataline-workers/src/main/java/io/dataline/workers/DockerCheckConnectionWorker.java b/dataline-workers/src/main/java/io/dataline/workers/DockerCheckConnectionWorker.java index cf067ddb18f4..bcf69895c6d1 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/DockerCheckConnectionWorker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/DockerCheckConnectionWorker.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.dataline.config.StandardCheckConnectionInput; import io.dataline.config.StandardCheckConnectionOutput; +import io.dataline.workers.process.ProcessBuilderFactory; import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.TimeUnit; @@ -35,22 +36,25 @@ import org.slf4j.LoggerFactory; public class DockerCheckConnectionWorker implements CheckConnectionWorker { + private static final Logger LOGGER = LoggerFactory.getLogger(DockerCheckConnectionWorker.class); private static final String INPUT = "input.json"; private static final String OUTPUT = "output.json"; private final String imageName; + private final ProcessBuilderFactory pbf; Process tapProcess; - public DockerCheckConnectionWorker(String imageName) { + public DockerCheckConnectionWorker(final String imageName, final ProcessBuilderFactory pbf) { this.imageName = imageName; + this.pbf = pbf; } @Override public OutputAndStatus run( - StandardCheckConnectionInput standardCheckConnectionInput, Path workspacePath) { + StandardCheckConnectionInput standardCheckConnectionInput, Path jobRoot) { final ObjectMapper objectMapper = new ObjectMapper(); // write input struct to docker image @@ -60,25 +64,19 @@ public OutputAndStatus run( } catch (JsonProcessingException e) { throw new RuntimeException(e); } - final Path configPath = - WorkerUtils.writeFileToWorkspace(workspacePath, INPUT, inputString); // wrong type + + WorkerUtils.writeFileToWorkspace(jobRoot, INPUT, inputString); // wrong type // run it. patiently. try { - String[] tapCmd = { - "docker", "run", workspacePath.toString(), imageName, "--config", configPath.toString() - }; - - LOGGER.debug("Tap command: {}", String.join(" ", tapCmd)); - - tapProcess = new ProcessBuilder().command(tapCmd).start(); + tapProcess = pbf.create(jobRoot, imageName, "--config", INPUT).start(); while (!tapProcess.waitFor(1, TimeUnit.MINUTES)) { LOGGER.debug("Waiting for worker"); } // read output struct. assume it is written to correct place. - final String outputString = WorkerUtils.readFileFromWorkspace(workspacePath, OUTPUT); + final String outputString = WorkerUtils.readFileFromWorkspace(jobRoot, OUTPUT); final StandardCheckConnectionOutput standardCheckConnectionOutput = objectMapper.readValue(outputString, StandardCheckConnectionOutput.class); diff --git a/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java b/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java index 362643e68ee2..a8a6762240e9 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java @@ -34,8 +34,8 @@ public class EchoWorker implements Worker { public EchoWorker() {} @Override - public OutputAndStatus run(String string, Path workspaceRoot) { - LOGGER.info("Hello World. input: {}, workspace root: {}", string, workspaceRoot); + public OutputAndStatus run(String string, Path jobRoot) { + LOGGER.info("Hello World. input: {}, workspace root: {}", string, jobRoot); return new OutputAndStatus<>(JobStatus.SUCCESSFUL, "echoed"); } diff --git a/dataline-workers/src/main/java/io/dataline/workers/Worker.java b/dataline-workers/src/main/java/io/dataline/workers/Worker.java index 9c53a6e820de..a9aecf494d83 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/Worker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/Worker.java @@ -31,7 +31,7 @@ public interface Worker { * Blocking call to run the worker's workflow. Once this is complete, getStatus should return * either COMPLETE, FAILED, or CANCELLED. */ - OutputAndStatus run(InputType inputType, Path workspacePath) + OutputAndStatus run(InputType inputType, Path jobRoot) throws InvalidCredentialsException, InvalidCatalogException; void cancel(); diff --git a/dataline-workers/src/main/java/io/dataline/workers/process/DockerProcessBuilderFactory.java b/dataline-workers/src/main/java/io/dataline/workers/process/DockerProcessBuilderFactory.java new file mode 100644 index 000000000000..7445568b106f --- /dev/null +++ b/dataline-workers/src/main/java/io/dataline/workers/process/DockerProcessBuilderFactory.java @@ -0,0 +1,74 @@ +/* + * MIT License + * + * Copyright (c) 2020 Dataline + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.dataline.workers.process; + +import com.google.common.collect.Lists; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DockerProcessBuilderFactory implements ProcessBuilderFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(DockerProcessBuilderFactory.class); + + private static final Path MOUNT_DESTINATION = Path.of("/data"); + + private final String mountSource; + private final Path workspaceRoot; + private final String networkName; + + public DockerProcessBuilderFactory(Path workspaceRoot, String mountSource, String networkName) { + this.mountSource = mountSource; + this.workspaceRoot = workspaceRoot; + this.networkName = networkName; + } + + @Override + public ProcessBuilder create(final Path jobRoot, final String imageName, final String... args) { + final List cmd = + Lists.newArrayList( + "docker", + "run", + "-v", + String.format("%s:%s", mountSource, MOUNT_DESTINATION), + "-w", + rebasePath(jobRoot).toString(), + "--network", + networkName, + imageName); + cmd.addAll(Arrays.asList(args)); + + LOGGER.debug("Preparing command: {}", cmd); + + return new ProcessBuilder(cmd); + } + + private Path rebasePath(final Path jobRoot) { + final Path relativePath = workspaceRoot.relativize(jobRoot); + return MOUNT_DESTINATION.resolve(relativePath); + } +} diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java b/dataline-workers/src/main/java/io/dataline/workers/process/ProcessBuilderFactory.java similarity index 75% rename from dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java rename to dataline-workers/src/main/java/io/dataline/workers/process/ProcessBuilderFactory.java index 4ce8afa19185..526b950c054a 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/process/ProcessBuilderFactory.java @@ -22,14 +22,11 @@ * SOFTWARE. */ -package io.dataline.workers.singer.postgres_tap; +package io.dataline.workers.process; -import io.dataline.integrations.Integrations; -import io.dataline.workers.singer.SingerCheckConnectionWorker; +import java.nio.file.Path; -public class SingerPostgresTapCheckConnectionWorker extends SingerCheckConnectionWorker { +public interface ProcessBuilderFactory { - public SingerPostgresTapCheckConnectionWorker() { - super(Integrations.POSTGRES_TAP.getCheckConnectionImage()); - } + ProcessBuilder create(Path jobPath, String imageName, String... args); } diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java index bffed8ecd21f..ecad376dd3fa 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java @@ -32,6 +32,7 @@ import io.dataline.workers.InvalidCredentialsException; import io.dataline.workers.JobStatus; import io.dataline.workers.OutputAndStatus; +import io.dataline.workers.process.ProcessBuilderFactory; import java.nio.file.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,19 +45,19 @@ public class SingerCheckConnectionWorker private final SingerDiscoverSchemaWorker singerDiscoverSchemaWorker; - public SingerCheckConnectionWorker(String imageName) { - this.singerDiscoverSchemaWorker = new SingerDiscoverSchemaWorker(imageName); + public SingerCheckConnectionWorker(final String imageName, final ProcessBuilderFactory pbf) { + this.singerDiscoverSchemaWorker = new SingerDiscoverSchemaWorker(imageName, pbf); } @Override public OutputAndStatus run( - StandardCheckConnectionInput input, Path workspaceRoot) throws InvalidCredentialsException { + StandardCheckConnectionInput input, Path jobRoot) throws InvalidCredentialsException { final StandardDiscoverSchemaInput discoverSchemaInput = new StandardDiscoverSchemaInput(); discoverSchemaInput.setConnectionConfiguration(input.getConnectionConfiguration()); OutputAndStatus outputAndStatus = - singerDiscoverSchemaWorker.run(discoverSchemaInput, workspaceRoot); + singerDiscoverSchemaWorker.run(discoverSchemaInput, jobRoot); StandardCheckConnectionOutput output = new StandardCheckConnectionOutput(); JobStatus jobStatus; if (outputAndStatus.getStatus() == JobStatus.SUCCESSFUL diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoverSchemaWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoverSchemaWorker.java index 6a9e022db571..582c5606454d 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoverSchemaWorker.java +++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoverSchemaWorker.java @@ -37,7 +37,7 @@ import io.dataline.workers.InvalidCredentialsException; import io.dataline.workers.JobStatus; import io.dataline.workers.OutputAndStatus; -import io.dataline.workers.utils.DockerUtils; +import io.dataline.workers.process.ProcessBuilderFactory; import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.TimeUnit; @@ -55,17 +55,20 @@ public class SingerDiscoverSchemaWorker private static String CATALOG_JSON_FILENAME = "catalog.json"; private static String ERROR_LOG_FILENAME = "err.log"; - private volatile Process workerProcess; private final String imageName; + private final ProcessBuilderFactory pbf; + + private volatile Process workerProcess; - public SingerDiscoverSchemaWorker(String imageName) { + public SingerDiscoverSchemaWorker(final String imageName, final ProcessBuilderFactory pbf) { this.imageName = imageName; + this.pbf = pbf; } // package private since package-local classes need direct access to singer catalog, and the // conversion from SingerSchema to Dataline schema is lossy OutputAndStatus runInternal( - StandardDiscoverSchemaInput discoverSchemaInput, Path workspaceRoot) + StandardDiscoverSchemaInput discoverSchemaInput, Path jobRoot) throws InvalidCredentialsException { // todo (cgardens) - just getting original impl to line up with new iface for now. this can be // reduced. @@ -78,25 +81,14 @@ OutputAndStatus runInternal( throw new RuntimeException(e); } - writeFile(workspaceRoot, CONFIG_JSON_FILENAME, configDotJson); + writeFile(jobRoot, CONFIG_JSON_FILENAME, configDotJson); // exec try { - String[] cmd = - DockerUtils.getDockerCommand( - workspaceRoot, - imageName, - "--config", - CONFIG_JSON_FILENAME, - imageName, - "--config", - CONFIG_JSON_FILENAME, - "--discover"); - workerProcess = - new ProcessBuilder(cmd) - .redirectError(getFullPath(workspaceRoot, ERROR_LOG_FILENAME).toFile()) - .redirectOutput(getFullPath(workspaceRoot, CATALOG_JSON_FILENAME).toFile()) + pbf.create(jobRoot, imageName, "--config", CONFIG_JSON_FILENAME, "--discover") + .redirectError(getFullPath(jobRoot, ERROR_LOG_FILENAME).toFile()) + .redirectOutput(getFullPath(jobRoot, CATALOG_JSON_FILENAME).toFile()) .start(); while (!workerProcess.waitFor(1, TimeUnit.MINUTES)) { @@ -105,12 +97,12 @@ OutputAndStatus runInternal( int exitCode = workerProcess.exitValue(); if (exitCode == 0) { - String catalog = readFile(workspaceRoot, CATALOG_JSON_FILENAME); + String catalog = readFile(jobRoot, CATALOG_JSON_FILENAME); final SingerCatalog singerCatalog = jsonCatalogToTyped(catalog); return new OutputAndStatus<>(SUCCESSFUL, singerCatalog); } else { // TODO throw invalid credentials exception where appropriate based on error log - String errLog = readFile(workspaceRoot, ERROR_LOG_FILENAME); + String errLog = readFile(jobRoot, ERROR_LOG_FILENAME); LOGGER.debug( "Discovery job subprocess finished with exit code {}. Error log: {}", exitCode, errLog); return new OutputAndStatus<>(FAILED); @@ -123,9 +115,9 @@ OutputAndStatus runInternal( @Override public OutputAndStatus run( - StandardDiscoverSchemaInput discoverSchemaInput, Path workspaceRoot) + StandardDiscoverSchemaInput discoverSchemaInput, Path jobRoot) throws InvalidCredentialsException { - OutputAndStatus output = runInternal(discoverSchemaInput, workspaceRoot); + OutputAndStatus output = runInternal(discoverSchemaInput, jobRoot); JobStatus status = output.getStatus(); OutputAndStatus finalOutput; diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTapFactory.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTapFactory.java index 7b861a8c582c..90078b7a00d6 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTapFactory.java +++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTapFactory.java @@ -36,17 +36,17 @@ import io.dataline.workers.OutputAndStatus; import io.dataline.workers.TapFactory; import io.dataline.workers.WorkerUtils; +import io.dataline.workers.process.ProcessBuilderFactory; import io.dataline.workers.protocol.singer.SingerJsonIterator; -import io.dataline.workers.utils.DockerUtils; import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; -import java.util.Arrays; import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SingerTapFactory implements TapFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SingerTapFactory.class); private static final String CONFIG_JSON_FILENAME = "tap_config.json"; @@ -54,20 +54,22 @@ public class SingerTapFactory implements TapFactory { private static final String STATE_JSON_FILENAME = "input_state.json"; - private final String dockerImageName; + private final String imageName; + private final ProcessBuilderFactory pbf; private Process tapProcess = null; private InputStream stdout = null; - public SingerTapFactory(String dockerImageName) { - this.dockerImageName = dockerImageName; + public SingerTapFactory(final String imageName, final ProcessBuilderFactory pbf) { + this.imageName = imageName; + this.pbf = pbf; } @SuppressWarnings("UnstableApiUsage") @Override - public Stream create(StandardTapConfig input, Path workspaceRoot) + public Stream create(StandardTapConfig input, Path jobRoot) throws InvalidCredentialsException { - OutputAndStatus discoveryOutput = runDiscovery(input, workspaceRoot); + OutputAndStatus discoveryOutput = runDiscovery(input, jobRoot); final ObjectMapper objectMapper = new ObjectMapper(); final String configDotJson; @@ -87,36 +89,24 @@ public Stream create(StandardTapConfig input, Path workspaceRoot) throw new RuntimeException(e); } - // write config.json to disk - Path configPath = - WorkerUtils.writeFileToWorkspace(workspaceRoot, CONFIG_JSON_FILENAME, configDotJson); - Path catalogPath = - WorkerUtils.writeFileToWorkspace(workspaceRoot, CATALOG_JSON_FILENAME, catalogDotJson); - Path statePath = - WorkerUtils.writeFileToWorkspace(workspaceRoot, STATE_JSON_FILENAME, stateDotJson); + WorkerUtils.writeFileToWorkspace(jobRoot, CONFIG_JSON_FILENAME, configDotJson); + WorkerUtils.writeFileToWorkspace(jobRoot, CATALOG_JSON_FILENAME, catalogDotJson); + WorkerUtils.writeFileToWorkspace(jobRoot, STATE_JSON_FILENAME, stateDotJson); try { - - String[] tapCmd = - DockerUtils.getDockerCommand( - workspaceRoot, - dockerImageName, - "--config", - configPath.toString(), - // TODO support both --properties and --catalog depending on integration - "--properties", - catalogPath.toString(), - "--state", - statePath.toString()); - - LOGGER.info("running command: {}", Arrays.toString(tapCmd)); - tapProcess = - new ProcessBuilder() - .command(tapCmd) - .redirectError(workspaceRoot.resolve(DefaultSyncWorker.TAP_ERR_LOG).toFile()) + pbf.create( + jobRoot, + imageName, + "--config", + CONFIG_JSON_FILENAME, + // TODO support both --properties and --catalog depending on integration + "--properties", + CATALOG_JSON_FILENAME, + "--state", + STATE_JSON_FILENAME) + .redirectError(jobRoot.resolve(DefaultSyncWorker.TAP_ERR_LOG).toFile()) .start(); - } catch (IOException e) { throw new RuntimeException(e); } @@ -145,7 +135,7 @@ private OutputAndStatus runDiscovery(StandardTapConfig input, Pat discoveryInput.setConnectionConfiguration( input.getSourceConnectionImplementation().getConfiguration()); Path scopedWorkspace = workspaceRoot.resolve("discovery"); - return new SingerDiscoverSchemaWorker(dockerImageName) + return new SingerDiscoverSchemaWorker(imageName, pbf) .runInternal(discoveryInput, scopedWorkspace); } } diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTargetFactory.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTargetFactory.java index 45019f0f9481..b2e6ce4249da 100644 --- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTargetFactory.java +++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTargetFactory.java @@ -34,7 +34,7 @@ import io.dataline.workers.TargetConsumer; import io.dataline.workers.TargetFactory; import io.dataline.workers.WorkerUtils; -import io.dataline.workers.utils.DockerUtils; +import io.dataline.workers.process.ProcessBuilderFactory; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; @@ -43,19 +43,21 @@ import org.slf4j.LoggerFactory; public class SingerTargetFactory implements TargetFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SingerTargetFactory.class); private static final String CONFIG_JSON_FILENAME = "target_config.json"; - private final String dockerImageName; + private final String imageName; + private final ProcessBuilderFactory pbf; - public SingerTargetFactory(String dockerImageName) { - this.dockerImageName = dockerImageName; + public SingerTargetFactory(final String imageName, final ProcessBuilderFactory pbf) { + this.imageName = imageName; + this.pbf = pbf; } @Override - public CloseableConsumer create( - StandardTargetConfig targetConfig, Path workspacePath) { + public CloseableConsumer create(StandardTargetConfig targetConfig, Path jobRoot) { final ObjectMapper objectMapper = new ObjectMapper(); final String configDotJson; @@ -69,17 +71,12 @@ public CloseableConsumer create( // write config.json to disk Path configPath = - WorkerUtils.writeFileToWorkspace(workspacePath, CONFIG_JSON_FILENAME, configDotJson); - - String[] dockerCmd = - DockerUtils.getDockerCommand( - workspacePath, dockerImageName, "--config", configPath.toString()); + WorkerUtils.writeFileToWorkspace(jobRoot, CONFIG_JSON_FILENAME, configDotJson); try { final Process targetProcess = - new ProcessBuilder() - .command(dockerCmd) - .redirectError(workspacePath.resolve(DefaultSyncWorker.TARGET_ERR_LOG).toFile()) + pbf.create(jobRoot, imageName, "--config", configPath.toString()) + .redirectError(jobRoot.resolve(DefaultSyncWorker.TARGET_ERR_LOG).toFile()) .start(); try (BufferedWriter writer = diff --git a/dataline-workers/src/main/java/io/dataline/workers/utils/DockerUtils.java b/dataline-workers/src/main/java/io/dataline/workers/utils/DockerUtils.java deleted file mode 100644 index 744dca036bd6..000000000000 --- a/dataline-workers/src/main/java/io/dataline/workers/utils/DockerUtils.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Dataline - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.dataline.workers.utils; - -import java.nio.file.Path; -import org.apache.commons.lang3.ArrayUtils; - -public class DockerUtils { - public static String[] getDockerCommand(Path workspaceRoot, String imageName, String... args) { - final String[] dockerCommander = { - "docker", - "run", - "-v", - String.format("%s:/dataline/data", workspaceRoot.toString()), - "--network=host", - imageName - }; - - return ArrayUtils.addAll(dockerCommander, args); - } -} diff --git a/dataline-workers/src/test/java/io/dataline/workers/BaseWorkerTestCase.java b/dataline-workers/src/test/java/io/dataline/workers/BaseWorkerTestCase.java index 2ac0e9189313..99954b61e57d 100644 --- a/dataline-workers/src/test/java/io/dataline/workers/BaseWorkerTestCase.java +++ b/dataline-workers/src/test/java/io/dataline/workers/BaseWorkerTestCase.java @@ -28,6 +28,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Resources; +import io.dataline.workers.process.DockerProcessBuilderFactory; +import io.dataline.workers.process.ProcessBuilderFactory; import java.io.File; import java.io.IOException; import java.net.URL; @@ -36,28 +38,26 @@ import java.nio.file.Path; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; -import org.testcontainers.shaded.org.apache.commons.io.FileUtils; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class BaseWorkerTestCase { // TODO inject via env - protected Path workspaceDirectory; + protected Path workspaceRoot; + protected ProcessBuilderFactory pbf; @BeforeAll public void init() throws IOException { - FileUtils.forceMkdir(new File("/tmp/tests")); - workspaceDirectory = Files.createTempDirectory(Path.of("/tmp/tests"), "dataline"); - System.out.println("Workspace directory: " + workspaceDirectory.toString()); + final Path testsPath = Path.of("/tmp/tests"); + Files.createDirectories(testsPath); + this.workspaceRoot = Files.createTempDirectory(testsPath, "dataline"); + this.pbf = new DockerProcessBuilderFactory(workspaceRoot, workspaceRoot.toString(), "host"); + + System.out.println("Workspace directory: " + workspaceRoot.toString()); } - protected Path createWorkspacePath(String jobId) { - final Path workspacePath = workspaceDirectory.resolve(jobId); - try { - FileUtils.forceMkdir(workspacePath.toFile()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return workspacePath; + protected Path createJobRoot(String jobId) throws IOException { + final Path jobRoot = workspaceRoot.resolve(jobId); + return Files.createDirectories(jobRoot); } protected String readResource(String name) { diff --git a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java index 50a0f4231fd9..9e34b64e9abe 100644 --- a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java +++ b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java @@ -43,7 +43,6 @@ import java.sql.DriverManager; import java.sql.SQLException; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.PostgreSQLContainer; @@ -73,9 +72,9 @@ public void testNonexistentDb() standardCheckConnectionInput.setConnectionConfiguration(o); SingerCheckConnectionWorker worker = - new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage()); + new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage(), pbf); OutputAndStatus run = - worker.run(standardCheckConnectionInput, createWorkspacePath(jobId)); + worker.run(standardCheckConnectionInput, createJobRoot(jobId)); assertEquals(FAILED, run.getStatus()); assertTrue(run.getOutput().isPresent()); @@ -97,7 +96,7 @@ public void testIncorrectAuthCredentials() db.getFirstMappedPort() + ""); SingerCheckConnectionWorker worker = - new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage()); + new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage(), pbf); final Object o = new ObjectMapper().readValue(incorrectCreds, Object.class); final StandardCheckConnectionInput standardCheckConnectionInput = @@ -105,7 +104,7 @@ public void testIncorrectAuthCredentials() standardCheckConnectionInput.setConnectionConfiguration(o); OutputAndStatus run = - worker.run(standardCheckConnectionInput, createWorkspacePath(jobId)); + worker.run(standardCheckConnectionInput, createJobRoot(jobId)); assertEquals(FAILED, run.getStatus()); assertTrue(run.getOutput().isPresent()); @@ -114,7 +113,6 @@ public void testIncorrectAuthCredentials() // in the logs } - @Disabled @Test public void testSuccessfulConnection() throws IOException, InvalidCredentialsException, InvalidCatalogException { @@ -128,9 +126,9 @@ public void testSuccessfulConnection() standardCheckConnectionInput.setConnectionConfiguration(o); SingerCheckConnectionWorker worker = - new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage()); + new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage(), pbf); OutputAndStatus run = - worker.run(standardCheckConnectionInput, createWorkspacePath(jobId)); + worker.run(standardCheckConnectionInput, createJobRoot(jobId)); assertEquals(SUCCESSFUL, run.getStatus()); assertTrue(run.getOutput().isPresent()); diff --git a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoverSchemaWorkerTest.java b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoverSchemaWorkerTest.java index 48579e84f9dd..fee9b259bc69 100644 --- a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoverSchemaWorkerTest.java +++ b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoverSchemaWorkerTest.java @@ -34,19 +34,16 @@ import io.dataline.config.StandardDiscoverSchemaOutput; import io.dataline.integrations.Integrations; import io.dataline.workers.BaseWorkerTestCase; -import io.dataline.workers.InvalidCatalogException; import io.dataline.workers.InvalidCredentialsException; import io.dataline.workers.OutputAndStatus; import io.dataline.workers.PostgreSQLContainerTestHelper; import java.io.IOException; -import java.sql.SQLException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; @@ -56,28 +53,25 @@ public class SingerDiscoverSchemaWorkerTest extends BaseWorkerTestCase { PostgreSQLContainer db; @BeforeAll - public void initDb() throws SQLException, IOException, InterruptedException { + public void initDb() throws IOException, InterruptedException { db = new PostgreSQLContainer(); db.start(); PostgreSQLContainerTestHelper.runSqlScript( MountableFile.forClasspathResource("simple_postgres_init.sql"), db); } - @Disabled @Test - public void testPostgresDiscovery() - throws IOException, InvalidCredentialsException, InvalidCatalogException { + public void testPostgresDiscovery() throws IOException, InvalidCredentialsException { final String jobId = "1"; String postgresCreds = PostgreSQLContainerTestHelper.getSingerTapConfig(db); final Object o = new ObjectMapper().readValue(postgresCreds, Object.class); final StandardDiscoverSchemaInput input = new StandardDiscoverSchemaInput(); input.setConnectionConfiguration(o); - + System.out.println(input); SingerDiscoverSchemaWorker worker = - new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage()); + new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage(), pbf); - OutputAndStatus run = - worker.run(input, createWorkspacePath(jobId)); + OutputAndStatus run = worker.run(input, createJobRoot(jobId)); assertEquals(SUCCESSFUL, run.getStatus()); @@ -100,17 +94,17 @@ public void testCancellation() throws IOException, InterruptedException, Executi input.setConnectionConfiguration(o); SingerDiscoverSchemaWorker worker = - new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage()); + new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage(), pbf); ExecutorService threadPool = Executors.newFixedThreadPool(2); Future workerWasCancelled = threadPool.submit( () -> { - OutputAndStatus output = null; try { - output = worker.run(input, createWorkspacePath(jobId)); + OutputAndStatus output = + worker.run(input, createJobRoot(jobId)); assertEquals(FAILED, output.getStatus()); - } catch (InvalidCredentialsException e) { + } catch (Exception e) { throw new RuntimeException(e); } }); diff --git a/dataline-workers/src/test/resources/simple_discovered_postgres_schema.json b/dataline-workers/src/test/resources/simple_discovered_postgres_schema.json index 0c18e9c7853b..650ee726b0ff 100644 --- a/dataline-workers/src/test/resources/simple_discovered_postgres_schema.json +++ b/dataline-workers/src/test/resources/simple_discovered_postgres_schema.json @@ -3,7 +3,7 @@ "tables": [ { "name": "id_and_name", - "selected": true, + "selected": false, "columns": [ { "name": "name", diff --git a/docker-compose.build.yaml b/docker-compose.build.yaml index c0122dc13c62..51d8a7d2d912 100644 --- a/docker-compose.build.yaml +++ b/docker-compose.build.yaml @@ -24,3 +24,10 @@ services: dockerfile: Dockerfile.build target: webapp context: . + +volumes: + workspace: + +networks: + dataline: + driver: "bridge" diff --git a/docker-compose.yaml b/docker-compose.yaml index 92fee4501903..89f7e0c94580 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -18,7 +18,7 @@ services: - DATABASE_URL=${DATABASE_URL} - WAIT_BEFORE_HOSTS=5 - WAIT_HOSTS=db:5432 - - CONFIG_PERSISTENCE_ROOT=${CONFIG_PERSISTENCE_ROOT} + - CONFIG_ROOT=${CONFIG_ROOT} depends_on: - db server: @@ -31,7 +31,7 @@ services: - DATABASE_URL=${DATABASE_URL} - WAIT_BEFORE_HOSTS=5 - WAIT_HOSTS=db:5432 - - CONFIG_PERSISTENCE_ROOT=${CONFIG_PERSISTENCE_ROOT} + - CONFIG_ROOT=${CONFIG_ROOT} ports: - 8001:8001 depends_on: diff --git a/tools/bin/gradled.sh b/tools/bin/gradled.sh new file mode 100755 index 000000000000..3bd18f1803b4 --- /dev/null +++ b/tools/bin/gradled.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash + +set -e + +. tools/lib/lib.sh + +IMG_NAME=dataline/build-project:dev +TMP_VOLUME_NAME=gradlew-tmp + +main() { + assert_root + + if [[ $# -gt 0 ]]; then + OPTS= + CMD="./gradlew $@" + else + OPTS=-it + CMD=/bin/bash + fi + local args=${@:-/bin/bash} + + docker build -f Dockerfile.build . -t $IMG_NAME --target build-project + + docker run $OPTS --rm \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/code \ + -p 5005:5005 \ + -e GRADLE_OPTS="-Dorg.gradle.daemon=false" \ + $IMG_NAME $CMD +} + +main "$@"