Skip to content

Commit

Permalink
Merge branch 'master' into source-amazon-seller-partner-fix-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
askarpets committed Feb 21, 2024
2 parents 8285ed6 + 29bcceb commit 20b8341
Show file tree
Hide file tree
Showing 99 changed files with 3,415 additions and 669 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/connector_teams_review_requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ on:
- synchronize
paths:
- "airbyte-integrations/connectors/source-**"
- "airbyte-integrations/connectors/destination-**"
pull_request_review:
paths:
- "airbyte-integrations/connectors/source-**"
- "airbyte-integrations/connectors/destination-**"
jobs:
check-review-requirements:
name: "Check if a review is required from Connector teams"
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
| 0.21.2 | 2024-02-20 | [\#34978](https://github.com/airbytehq/airbyte/pull/34978) | Reduce log noise in NormalizationLogParser. |
| 0.21.1 | 2024-02-20 | [\#35199](https://github.com/airbytehq/airbyte/pull/35199) | Add thread names to the logs. |
| 0.21.0 | 2024-02-16 | [\#35314](https://github.com/airbytehq/airbyte/pull/35314) | Delete S3StreamCopier classes. These have been superseded by the async destinations framework. |
| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Stream<AirbyteMessage> toMessages(final String line) {
if (Strings.isEmpty(line)) {
return Stream.of(logMessage(Level.INFO, ""));
}
final Optional<JsonNode> json = Jsons.tryDeserialize(line);
final Optional<JsonNode> json = Jsons.tryDeserializeWithoutWarn(line);
if (json.isPresent()) {
return jsonToMessage(json.get());
} else {
Expand Down Expand Up @@ -96,7 +96,7 @@ private Stream<AirbyteMessage> jsonToMessage(final JsonNode jsonLine) {
*/
final String logLevel = (jsonLine.hasNonNull("level")) ? jsonLine.get("level").asText() : "";
String logMsg = jsonLine.hasNonNull("msg") ? jsonLine.get("msg").asText() : "";
Level level;
final Level level;
switch (logLevel) {
case "debug" -> level = Level.DEBUG;
case "info" -> level = Level.INFO;
Expand All @@ -117,15 +117,15 @@ private Stream<AirbyteMessage> jsonToMessage(final JsonNode jsonLine) {
}
}

private static AirbyteMessage logMessage(Level level, String message) {
private static AirbyteMessage logMessage(final Level level, final String message) {
return new AirbyteMessage()
.withType(Type.LOG)
.withLog(new AirbyteLogMessage()
.withLevel(level)
.withMessage(message));
}

public static void main(String[] args) {
public static void main(final String[] args) {
final NormalizationLogParser normalizationLogParser = new NormalizationLogParser();
final Stream<AirbyteMessage> airbyteMessageStream =
normalizationLogParser.create(new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8)));
Expand All @@ -135,8 +135,8 @@ public static void main(String[] args) {
final String dbtErrorStack = String.join("\n", errors);
if (!"".equals(dbtErrorStack)) {
final Map<ErrorMapKeys, String> errorMap = SentryExceptionHelper.getUsefulErrorMessageAndTypeFromDbtError(dbtErrorStack);
String internalMessage = errorMap.get(ErrorMapKeys.ERROR_MAP_MESSAGE_KEY);
AirbyteMessage traceMessage = new AirbyteMessage()
final String internalMessage = errorMap.get(ErrorMapKeys.ERROR_MAP_MESSAGE_KEY);
final AirbyteMessage traceMessage = new AirbyteMessage()
.withType(Type.TRACE)
.withTrace(new AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.ERROR)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.21.1
version=0.21.3
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,54 @@
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.apache.logging.log4j.spi.ExtendedLogger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AirbyteLogMessageTemplateTest {

private static final ByteArrayOutputStream outputContent = new ByteArrayOutputStream();
private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteLogMessageTemplateTest.class);
public static final String OUTPUT_STREAM_APPENDER = "OutputStreamAppender";
public static final String CONSOLE_JSON_APPENDER = "ConsoleJSONAppender";
private static OutputStreamAppender outputStreamAppender;
private static LoggerConfig rootLoggerConfig;
private static LoggerContext loggerContext;
private LoggerContext loggerContext;
private LoggerConfig rootLoggerConfig;
private ExtendedLogger logger;
private OutputStreamAppender outputStreamAppender;
private ByteArrayOutputStream outputContent;

@BeforeAll
static void init() {
void getLogger() {
// We are creating a log appender with the same output pattern
// as the console json appender defined in this project's log4j2.xml file.
// We then attach this log appender with the LOGGER instance so that we can validate the logs
// produced by code and assert that it matches the expected format.
loggerContext = Configurator.initialize(null, "log4j2.xml");

final Configuration configuration = loggerContext.getConfiguration();
rootLoggerConfig = configuration.getLoggerConfig("");

outputContent = new ByteArrayOutputStream();
outputStreamAppender = OutputStreamAppender.createAppender(
rootLoggerConfig.getAppenders().get(CONSOLE_JSON_APPENDER).getLayout(),
null, outputContent, OUTPUT_STREAM_APPENDER, false, true);
outputStreamAppender.start();

rootLoggerConfig.addAppender(outputStreamAppender, Level.ALL, null);
logger = loggerContext.getLogger(AirbyteLogMessageTemplateTest.class);
}

@BeforeEach
void setup() {
outputContent.reset();
}

@AfterAll
static void cleanUp() {
@AfterEach
void closeLogger() {
outputStreamAppender.stop();
rootLoggerConfig.removeAppender(OUTPUT_STREAM_APPENDER);
loggerContext.close();
}

@Test
public void testAirbyteLogMessageFormat() throws java.io.IOException {
LOGGER.info("hello");
getLogger();
logger.info("hello");

outputContent.flush();
final String logMessage = outputContent.toString(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -114,12 +109,13 @@ private AirbyteLogMessage validateAirbyteMessageIsLog(final AirbyteMessage airby

@ParameterizedTest
@ValueSource(ints = {2, 100, 9000})
public void testAirbyteLogMessageLength(int stringRepeatitions) throws java.io.IOException {
public void testAirbyteLogMessageLength(int stringRepetitions) throws java.io.IOException {
getLogger();
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < stringRepeatitions; i++) {
for (int i = 0; i < stringRepetitions; i++) {
sb.append("abcd");
}
LOGGER.info(sb.toString(), new RuntimeException("aaaaa bbbbbb ccccccc dddddd"));
logger.info(sb.toString(), new RuntimeException("aaaaa bbbbbb ccccccc dddddd"));
outputContent.flush();
final String logMessage = outputContent.toString(StandardCharsets.UTF_8);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.extensions;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.jupiter.api.extension.DynamicTestInvocationContext;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.InvocationInterceptor;
import org.junit.jupiter.api.extension.ReflectiveInvocationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* By default, junit only output logs to the console, and nothing makes it into log4j logs. This
* class fixes that by using the interceptor facility to print progress and timing information. This
* allows us to have junit loglines in our test logs. This is instanciated via <a href=
* "https://docs.oracle.com/javase%2F9%2Fdocs%2Fapi%2F%2F/java/util/ServiceLoader.html">Java's
* ServiceLoader</a> The declaration can be found in
* resources/META-INF/services/org.junit.jupiter.api.extension.Extension
*/
public class LoggingInvocationInterceptor implements InvocationInterceptor {

private static final class LoggingInvocationInterceptorHandler implements InvocationHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingInvocationInterceptor.class);

private static final Pattern methodPattern = Pattern.compile("intercept(.*)Method");

@Override
@SuppressWarnings("unchecked")
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (LoggingInvocationInterceptor.class.getDeclaredMethod(method.getName(), Invocation.class, ReflectiveInvocationContext.class,
ExtensionContext.class) == null) {
LOGGER.error("Junit LoggingInvocationInterceptor executing unknown interception point {}", method.getName());
return method.invoke(proxy, args);
}
var invocation = (Invocation<?>) args[0];
var invocationContext = (ReflectiveInvocationContext<Method>) args[1];
var extensionContext = (ExtensionContext) args[2];
String methodName = method.getName();
String logLineSuffix;
Matcher methodMatcher = methodPattern.matcher(methodName);
if (methodName.equals("interceptDynamicTest")) {
logLineSuffix = "execution of DynamicTest %s".formatted(extensionContext.getDisplayName());
} else if (methodName.equals("interceptTestClassConstructor")) {
logLineSuffix = "instance creation for %s".formatted(invocationContext.getTargetClass());
} else if (methodMatcher.matches()) {
String interceptedEvent = methodMatcher.group(1);
logLineSuffix = "execution of @%s method %s.%s".formatted(invocationContext.getExecutable().getDeclaringClass().getSimpleName(),
interceptedEvent, invocationContext.getExecutable().getName());
} else {
logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName);
}
LOGGER.info("Junit starting {}", logLineSuffix);
try {
Instant start = Instant.now();
Object retVal = invocation.proceed();
long elapsedMs = Duration.between(start, Instant.now()).toMillis();
LOGGER.info("Junit completed {} in {} ms", logLineSuffix, elapsedMs);
return retVal;
} catch (Throwable t) {
String stackTrace = Arrays.stream(ExceptionUtils.getStackFrames(t)).takeWhile(s -> !s.startsWith("\tat org.junit")).collect(
Collectors.joining("\n "));
LOGGER.warn("Junit exception throw during {}:\n{}", logLineSuffix, stackTrace);
throw t;
}
}

}

private final InvocationInterceptor proxy = (InvocationInterceptor) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class[] {InvocationInterceptor.class},
new LoggingInvocationInterceptorHandler());

@Override
public void interceptAfterAllMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptAfterAllMethod(invocation, invocationContext, extensionContext);
}

@Override
public void interceptAfterEachMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptAfterEachMethod(invocation, invocationContext, extensionContext);
}

@Override
public void interceptBeforeAllMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptBeforeAllMethod(invocation, invocationContext, extensionContext);
}

@Override
public void interceptBeforeEachMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptBeforeEachMethod(invocation, invocationContext, extensionContext);
}

@Override
public void interceptDynamicTest(Invocation<Void> invocation,
DynamicTestInvocationContext invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptDynamicTest(invocation, invocationContext, extensionContext);
}

@Override
public void interceptTestMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptTestMethod(invocation, invocationContext, extensionContext);
}

@Override
public void interceptTestTemplateMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
proxy.interceptTestTemplateMethod(invocation, invocationContext, extensionContext);
}

@Override
public <T> T interceptTestFactoryMethod(Invocation<T> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
return proxy.interceptTestFactoryMethod(invocation, invocationContext, extensionContext);
}

@Override
public <T> T interceptTestClassConstructor(Invocation<T> invocation,
ReflectiveInvocationContext<Constructor<T>> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
return proxy.interceptTestClassConstructor(invocation, invocationContext, extensionContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

Expand Down Expand Up @@ -65,10 +68,13 @@ GenericContainer<?> container() {
}

private static final ConcurrentMap<ContainerKey, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
private static final AtomicInteger containerId = new AtomicInteger(0);

private static final MdcScope.Builder TESTCONTAINER_LOG_MDC_BUILDER = new MdcScope.Builder()
.setLogPrefix("testcontainer")
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);
private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, List<String> methods) {
return new MdcScope.Builder()
.setLogPrefix("testcontainer %s (%s[%s]):".formatted(containerId.incrementAndGet(), imageName, StringUtils.join(methods, ",")))
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);
}

/**
* Creates a new, unshared testcontainer instance. This usually wraps the default constructor for
Expand Down Expand Up @@ -108,8 +114,16 @@ private GenericContainer<?> createAndStartContainer(DockerImageName imageName, L
for (String methodName : methodNames) {
methods.add(getClass().getMethod(methodName, container.getClass()));
}
final var logConsumer = new Slf4jLogConsumer(LOGGER);
TESTCONTAINER_LOG_MDC_BUILDER.produceMappings(logConsumer::withMdc);
final var logConsumer = new Slf4jLogConsumer(LOGGER) {

public void accept(OutputFrame frame) {
if (frame.getUtf8StringWithoutLineEnding().trim().length() > 0) {
super.accept(frame);
}
}

};
getTestContainerLogMdcBuilder(imageName, methodNames).produceMappings(logConsumer::withMdc);
container.withLogConsumer(logConsumer);
for (Method method : methods) {
LOGGER.info("Calling {} in {} on new shared container based on {}.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.airbyte.cdk.extensions.LoggingInvocationInterceptor
Loading

0 comments on commit 20b8341

Please sign in to comment.