Skip to content

Commit

Permalink
Merge branch 'master' into daryna/source-bing-ads/stream-budget-and-p…
Browse files Browse the repository at this point in the history
…roduct_dimension_performance_report
  • Loading branch information
darynaishchenko authored Mar 1, 2024
2 parents df4c4f8 + 61e0b3f commit 66a0cc5
Show file tree
Hide file tree
Showing 81 changed files with 757 additions and 974 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
if: needs.changes.outputs.connectors == 'true'
name: Connectors CI
runs-on: connector-test-large
timeout-minutes: 1440 # 24 hours
timeout-minutes: 360 # 6 hours
steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
Expand Down
4 changes: 3 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Add a getNamespace into TestDataHolder |
| 0.23.9 | 2024-03-01 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.8 | 2024-02-28 | [\#35529](https://github.com/airbytehq/airbyte/pull/35529) | Refactor on state iterators |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder |
| 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. |
| 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public enum DatabaseDriver {
DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2://%s:%d/%s"),
STARBURST("io.trino.jdbc.TrinoDriver", "jdbc:trino://%s:%s/%s?SSL=true&source=airbyte"),
MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb://%s:%d/%s"),
MSSQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://%s:%d/%s"),
MSSQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://%s:%d;databaseName=%s"),
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql://%s:%d/%s"),
ORACLE("oracle.jdbc.OracleDriver", "jdbc:oracle:thin:@%s:%d/%s"),
VERTICA("com.vertica.jdbc.Driver", "jdbc:vertica://%s:%d/%s"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.7
version=0.23.9
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
logLineSuffix = "instance creation for %s".formatted(invocationContext.getTargetClass());
} else if (methodMatcher.matches()) {
String interceptedEvent = methodMatcher.group(1);
logLineSuffix = "execution of @%s method %s.%s".formatted(invocationContext.getExecutable().getDeclaringClass().getSimpleName(),
interceptedEvent, invocationContext.getExecutable().getName());
logLineSuffix = "execution of @%s method %s.%s".formatted(interceptedEvent,
invocationContext.getExecutable().getDeclaringClass().getSimpleName(),
invocationContext.getExecutable().getName());
} else {
logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,50 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.cdk.testutils.ContainerFactory;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.utility.DockerImageName;

public class SshBastionContainer implements AutoCloseable {

public static class SshBastionContainerFactory extends ContainerFactory<GenericContainer<?>> {

@Override
protected GenericContainer<?> createNewContainer(DockerImageName imageName) {
var container = new GenericContainer(new ImageFromDockerfile("bastion-test")
.withFileFromClasspath("Dockerfile", "bastion/Dockerfile"))
.withExposedPorts(22);
return container;
}

public GenericContainer exclusive(final Network network) {
Consumer<GenericContainer<?>> imageModifier = c -> {
c.withNetwork(network);
};
var container = super.exclusive("bastion-test", new NamedContainerModifierImpl<>("withNetwork", imageModifier));
return container;
}

}

private static final SshBastionContainerFactory factory = new SshBastionContainerFactory();

private static final String SSH_USER = "sshuser";
private static final String SSH_PASSWORD = "secret";
private GenericContainer bastion;

public void initAndStartBastion(final Network network) {
bastion = new GenericContainer(
new ImageFromDockerfile("bastion-test")
.withFileFromClasspath("Dockerfile", "bastion/Dockerfile"))
.withNetwork(network)
.withExposedPorts(22);
bastion = factory.exclusive(network);
bastion.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.testutils;

import com.google.common.collect.Lists;
import io.airbyte.commons.logging.LoggingHelper;
import io.airbyte.commons.logging.MdcScope;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -13,13 +14,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
Expand All @@ -28,11 +29,13 @@
* ContainerFactory is the companion to {@link TestDatabase} and provides it with suitable
* testcontainer instances.
*/
public abstract class ContainerFactory<C extends JdbcDatabaseContainer<?>> {
public abstract class ContainerFactory<C extends GenericContainer<?>> {

static private final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class);

private record ContainerKey(Class<? extends ContainerFactory> clazz, DockerImageName imageName, List<String> methods) {};
private record ContainerKey<C extends GenericContainer<?>> (Class<? extends ContainerFactory> clazz,
DockerImageName imageName,
List<? extends NamedContainerModifier<C>> methods) {};

private static class ContainerOrException {

Expand Down Expand Up @@ -67,12 +70,13 @@ GenericContainer<?> container() {

}

private static final ConcurrentMap<ContainerKey, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
private final ConcurrentMap<ContainerKey<C>, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
private static final AtomicInteger containerId = new AtomicInteger(0);

private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, List<String> methods) {
private final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName,
List<? extends NamedContainerModifier<C>> containerModifiers) {
return new MdcScope.Builder()
.setLogPrefix("testcontainer %s (%s[%s]):".formatted(containerId.incrementAndGet(), imageName, StringUtils.join(methods, ",")))
.setLogPrefix("testcontainer %s (%s[%s]):".formatted(containerId.incrementAndGet(), imageName, StringUtils.join(containerModifiers, ",")))
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);
}

Expand All @@ -84,10 +88,25 @@ private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageN

/**
* Returns a shared instance of the testcontainer.
*
* @Deprecated use shared(String, NamedContainerModifier) instead
*/
@SuppressWarnings("unchecked")
@Deprecated
public final C shared(String imageName, String... methods) {
final var containerKey = new ContainerKey(getClass(), DockerImageName.parse(imageName), Stream.of(methods).toList());
return shared(imageName,
Stream.of(methods).map(n -> new NamedContainerModifierImpl<C>(n, resolveModifierByName(n))).toList());
}

public final C shared(String imageName, NamedContainerModifier<C>... namedContainerModifiers) {
return shared(imageName, List.of(namedContainerModifiers));
}

public final C shared(String imageName) {
return shared(imageName, new ArrayList<>());
}

public final C shared(String imageName, List<? extends NamedContainerModifier<C>> namedContainerModifiers) {
final ContainerKey<C> containerKey = new ContainerKey<>(getClass(), DockerImageName.parse(imageName), namedContainerModifiers);
// We deliberately avoid creating the container itself eagerly during the evaluation of the map
// value.
// Container creation can be exceedingly slow.
Expand All @@ -100,41 +119,83 @@ public final C shared(String imageName, String... methods) {

/**
* Returns an exclusive instance of the testcontainer.
*
* @Deprecated use exclusive(String, NamedContainerModifier) instead
*/
@SuppressWarnings("unchecked")
@Deprecated
public final C exclusive(String imageName, String... methods) {
return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).toList());
return exclusive(imageName,
(NamedContainerModifier<C>) Stream.of(methods).map(n -> new NamedContainerModifierImpl<C>(n, resolveModifierByName(n))).toList());
}

public final C exclusive(String imageName) {
return exclusive(imageName, new ArrayList<>());
}

public final C exclusive(String imageName, NamedContainerModifier<C>... namedContainerModifiers) {
return exclusive(imageName, List.of(namedContainerModifiers));
}

public final C exclusive(String imageName, List<NamedContainerModifier<C>> namedContainerModifiers) {
return (C) createAndStartContainer(DockerImageName.parse(imageName), namedContainerModifiers);
}

public interface NamedContainerModifier<C extends GenericContainer<?>> {

String name();

Consumer<C> modifier();

}

private GenericContainer<?> createAndStartContainer(DockerImageName imageName, List<String> methodNames) {
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames);
try {
GenericContainer<?> container = createNewContainer(imageName);
final var methods = new ArrayList<Method>();
for (String methodName : methodNames) {
methods.add(getClass().getMethod(methodName, container.getClass()));
public record NamedContainerModifierImpl<C extends GenericContainer<?>> (String name, Consumer<C> method) implements NamedContainerModifier<C> {

public String name() {
return name;
}

public Consumer<C> modifier() {
return method;
}

}

private Consumer<C> resolveModifierByName(String methodName) {
final ContainerFactory<C> self = this;
Consumer<C> resolvedMethod = c -> {
try {
Class<? extends GenericContainer> containerClass = c.getClass();
Method method = self.getClass().getMethod(methodName, containerClass);
method.invoke(self, c);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
final var logConsumer = new Slf4jLogConsumer(LOGGER) {
};
return resolvedMethod;
}

public void accept(OutputFrame frame) {
if (frame.getUtf8StringWithoutLineEnding().trim().length() > 0) {
super.accept(frame);
}
}
private C createAndStartContainer(DockerImageName imageName, List<? extends NamedContainerModifier<C>> namedContainerModifiers) {
LOGGER.info("Creating new container based on {} with {}.", imageName, Lists.transform(namedContainerModifiers, c -> c.name()));
C container = createNewContainer(imageName);
final var logConsumer = new Slf4jLogConsumer(LOGGER) {

};
getTestContainerLogMdcBuilder(imageName, methodNames).produceMappings(logConsumer::withMdc);
container.withLogConsumer(logConsumer);
for (Method method : methods) {
LOGGER.info("Calling {} in {} on new shared container based on {}.",
method.getName(), getClass().getName(), imageName);
method.invoke(this, container);
public void accept(OutputFrame frame) {
if (frame.getUtf8StringWithoutLineEnding().trim().length() > 0) {
super.accept(frame);
}
}
container.start();
return container;
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);

};
getTestContainerLogMdcBuilder(imageName, namedContainerModifiers).produceMappings(logConsumer::withMdc);
container.withLogConsumer(logConsumer);
for (NamedContainerModifier<C> resolvedNamedContainerModifier : namedContainerModifiers) {
LOGGER.info("Calling {} in {} on new container based on {}.",
resolvedNamedContainerModifier.name(), getClass().getName(), imageName);
resolvedNamedContainerModifier.modifier().accept(container);
}
container.start();
return container;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Iterator;
import javax.annotation.CheckForNull;
import org.slf4j.Logger;
Expand All @@ -19,16 +22,22 @@ public class SourceStateIterator<T> extends AbstractIterator<AirbyteMessage> imp

private static final Logger LOGGER = LoggerFactory.getLogger(SourceStateIterator.class);
private final Iterator<T> messageIterator;
private final ConfiguredAirbyteStream stream;
private final StateEmitFrequency stateEmitFrequency;
private boolean hasEmittedFinalState = false;
private long recordCount = 0L;
private Instant lastCheckpoint = Instant.now();

private final SourceStateIteratorManager sourceStateIteratorManager;
private final SourceStateMessageProducer sourceStateMessageProducer;

public SourceStateIterator(final Iterator<T> messageIterator,
final SourceStateIteratorManager sourceStateIteratorManager) {
final ConfiguredAirbyteStream stream,
final SourceStateMessageProducer sourceStateMessageProducer,
final StateEmitFrequency stateEmitFrequency) {
this.messageIterator = messageIterator;
this.sourceStateIteratorManager = sourceStateIteratorManager;
this.stream = stream;
this.sourceStateMessageProducer = sourceStateMessageProducer;
this.stateEmitFrequency = stateEmitFrequency;
}

@CheckForNull
Expand All @@ -45,8 +54,8 @@ protected AirbyteMessage computeNext() {
throw new RuntimeException(ex);
}
if (iteratorHasNextValue) {
if (sourceStateIteratorManager.shouldEmitStateMessage(recordCount, lastCheckpoint)) {
final AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint();
if (shouldEmitStateMessage() && sourceStateMessageProducer.shouldEmitStateMessage(stream)) {
final AirbyteStateMessage stateMessage = sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream);
stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));

recordCount = 0L;
Expand All @@ -58,15 +67,15 @@ protected AirbyteMessage computeNext() {
// Use try-catch to catch Exception that could occur when connection to the database fails
try {
final T message = messageIterator.next();
final AirbyteMessage processedMessage = sourceStateIteratorManager.processRecordMessage(message);
final AirbyteMessage processedMessage = sourceStateMessageProducer.processRecordMessage(stream, message);
recordCount++;
return processedMessage;
} catch (final Exception e) {
throw new RuntimeException(e);
}
} else if (!hasEmittedFinalState) {
hasEmittedFinalState = true;
final AirbyteStateMessage finalStateMessageForStream = sourceStateIteratorManager.createFinalStateMessage();
final AirbyteStateMessage finalStateMessageForStream = sourceStateMessageProducer.createFinalStateMessage(stream);
finalStateMessageForStream.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
recordCount = 0L;
return new AirbyteMessage()
Expand All @@ -77,4 +86,10 @@ protected AirbyteMessage computeNext() {
}
}

private boolean shouldEmitStateMessage() {
return (recordCount >= stateEmitFrequency.syncCheckpointRecords()
|| Duration.between(lastCheckpoint, OffsetDateTime.now()).compareTo(stateEmitFrequency.syncCheckpointDuration()) > 0);

}

}
Loading

0 comments on commit 66a0cc5

Please sign in to comment.