Skip to content

Commit

Permalink
Prepare clean mount management (airbytehq#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Aug 28, 2020
1 parent 5d696f0 commit f344ecb
Show file tree
Hide file tree
Showing 40 changed files with 533 additions and 246 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.dockerignore
.git
.idea
.gradle
**/build
**/node_modules
Dockerfile.*
Expand Down
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ DATABASE_USER=docker
DATABASE_PASSWORD=docker
DATABASE_DB=dataline
DATABASE_URL=jdbc:postgresql://db:5432/dataline
CONFIG_PERSISTENCE_ROOT=data/config
CONFIG_ROOT=data/config
WORKSPACE_ROOT=/tmp/workspace
9 changes: 8 additions & 1 deletion Dockerfile.build
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
#####################
FROM ubuntu:20.04 AS build-base

WORKDIR /code

ENV DEBIAN_FRONTEND noninteractive

# Install tools
RUN apt-get update && apt-get -y install curl

# Setup Node & Java
RUN curl -sL https://deb.nodesource.com/setup_14.x | bash -
RUN apt-get update && apt-get -y install \
nodejs \
openjdk-14-jdk
openjdk-14-jdk \
docker.io

#######################
# Prepare project env #
Expand All @@ -29,6 +34,8 @@ RUN ./gradlew build --no-daemon -g /home/gradle/.gradle
###################
FROM build-project AS build

WORKDIR /code

# Copy code, etc.
COPY . /code

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@

// we force all interaction with disk storage to be effectively single threaded.
public class DefaultConfigPersistence implements ConfigPersistence {

private static final Object lock = new Object();

private final ObjectMapper objectMapper;
private final JsonSchemaValidation jsonSchemaValidation;
private final String storageRoot;
private final Path storageRoot;

public DefaultConfigPersistence(String storageRoot) {
public DefaultConfigPersistence(Path storageRoot) {
this.storageRoot = storageRoot;
jsonSchemaValidation = new JsonSchemaValidation();
objectMapper = new ObjectMapper();
Expand Down Expand Up @@ -145,7 +146,7 @@ private Set<Path> getFiles(PersistenceConfigType persistenceConfigType) {
}

private Path getConfigDirectory(PersistenceConfigType persistenceConfigType) {
return Path.of(storageRoot).resolve(persistenceConfigType.toString());
return storageRoot.resolve(persistenceConfigType.toString());
}

private Path getConfigPath(PersistenceConfigType persistenceConfigType, String configId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DefaultConfigPersistenceTest {
@BeforeEach
void setUp() throws IOException {
rootPath = Files.createTempDirectory(DefaultConfigPersistenceTest.class.getName());
configPersistence = new DefaultConfigPersistence(rootPath.toString());
configPersistence = new DefaultConfigPersistence(rootPath);
}

private StandardSource generateStandardSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
* SOFTWARE.
*/

package io.dataline.workers.singer.postgres_tap;
package io.dataline.config;

import io.dataline.integrations.Integrations;
import io.dataline.workers.singer.SingerDiscoverSchemaWorker;
import java.nio.file.Path;

public class SingerPostgresTapDiscoverWorker extends SingerDiscoverSchemaWorker {
public interface Configs {

public SingerPostgresTapDiscoverWorker() {
super(Integrations.POSTGRES_TAP.getSyncImage());
}
Path getConfigRoot();

Path getWorkspaceRoot();

String getWorkspaceDockerMount();

String getDockerNetwork();
}
93 changes: 93 additions & 0 deletions dataline-config/src/main/java/io/dataline/config/EnvConfigs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.dataline.config;

import java.nio.file.Path;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EnvConfigs implements Configs {

private static final Logger LOGGER = LoggerFactory.getLogger(EnvConfigs.class);

public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT";
public static final String WORKSPACE_DOCKER_MOUNT = "WORKSPACE_DOCKER_MOUNT";
public static final String CONFIG_ROOT = "CONFIG_ROOT";
public static final String DOCKER_NETWORK = "DOCKER_NETWORK";

public static final String DEFAULT_NETWORK = "host";

private final Function<String, String> getEnv;

public EnvConfigs() {
this(System::getenv);
}

EnvConfigs(final Function<String, String> getEnv) {
this.getEnv = getEnv;
}

@Override
public Path getConfigRoot() {
return getPath(CONFIG_ROOT);
}

@Override
public Path getWorkspaceRoot() {
return getPath(WORKSPACE_ROOT);
}

@Override
public String getWorkspaceDockerMount() {
final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT);

if (mount != null) {
return mount;
}

LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT);
return getWorkspaceRoot().toString();
}

@Override
public String getDockerNetwork() {
final String network = getEnv.apply(DOCKER_NETWORK);
if (network != null) {
return network;
}

LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK);
return DEFAULT_NETWORK;
}

private Path getPath(final String name) {
final String value = getEnv.apply(name);
if (value == null) {
throw new IllegalArgumentException("Env variable not defined: " + name);
}
return Path.of(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.dataline.config;

import static org.mockito.Mockito.when;

import java.nio.file.Paths;
import java.util.function.Function;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class EnvConfigsTest {

private Function<String, String> function;
private EnvConfigs config;

@BeforeEach
void setUp() {
function = Mockito.mock(Function.class);
config = new EnvConfigs(function);
}

@Test
void ensureGetEnvBehavior() {
Assertions.assertNull(System.getenv("MY_RANDOM_VAR_1234"));
}

@Test
void testWorkspaceRoot() {
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceRoot());

when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
Assertions.assertEquals(Paths.get("abc/def"), config.getWorkspaceRoot());
}

@Test
void testConfigRoot() {
when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getConfigRoot());

when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn("a/b");
Assertions.assertEquals(Paths.get("a/b"), config.getConfigRoot());
}

@Test
void testGetDockerMount() {
when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
Assertions.assertEquals("abc/def", config.getWorkspaceDockerMount());

when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn("root");
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Assertions.assertEquals("root", config.getWorkspaceDockerMount());

when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceDockerMount());
}

@Test
void testDockerNetwork() {
when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn(null);
Assertions.assertEquals("host", config.getDockerNetwork());

when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn("abc");
Assertions.assertEquals("abc", config.getDockerNetwork());
}
}
2 changes: 0 additions & 2 deletions dataline-integrations/singer/csv/destination/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,4 @@ COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["target-csv"]
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,4 @@ COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["tap-exchangeratesapi"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@ RUN apt-get update && \
python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data
ENTRYPOINT ["target-postgres"]
2 changes: 0 additions & 2 deletions dataline-integrations/singer/postgres/source/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,4 @@ RUN python -m pip install --upgrade pip && \

RUN apt-get autoremove -y gcc

WORKDIR /singer/data

ENTRYPOINT ["tap-postgres"]
2 changes: 1 addition & 1 deletion dataline-scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ application {

run {
// default for running on local machine.
environment "CONFIG_PERSISTENCE_ROOT", new File(".").absolutePath + "data/config"
environment "CONFIG_ROOT", new File(".").absolutePath + "data/config"
environment "VERSION", "0.1.0"
environment "DATABASE_USER", "postgres"
environment "DATABASE_PASSWORD", ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static Long getSecondsInUnit(Schedule.TimeUnit timeUnitEnum) {
}

private static Long getIntervalInSeconds(Schedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits().longValue();
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

private Set<StandardSync> getAllActiveConnections() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package io.dataline.scheduler;

import io.dataline.db.DatabaseHelper;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -40,14 +42,20 @@ public class JobSubmitter implements Runnable {
private final ExecutorService threadPool;
private final BasicDataSource connectionPool;
private final SchedulerPersistence persistence;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;

public JobSubmitter(
ExecutorService threadPool,
BasicDataSource connectionPool,
SchedulerPersistence persistence) {
final ExecutorService threadPool,
final BasicDataSource connectionPool,
final SchedulerPersistence persistence,
final Path workspaceRoot,
final ProcessBuilderFactory pbf) {
this.threadPool = threadPool;
this.connectionPool = connectionPool;
this.persistence = persistence;
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}

@Override
Expand Down Expand Up @@ -86,6 +94,7 @@ private Optional<Job> getOldestPendingJob() throws SQLException {
}

private void submitJob(Job job) {
threadPool.submit(new WorkerRunner(job.getId(), connectionPool, persistence));
threadPool.submit(
new WorkerRunner(job.getId(), connectionPool, persistence, workspaceRoot, pbf));
}
}
Loading

0 comments on commit f344ecb

Please sign in to comment.