Skip to content

Commit

Permalink
Enable stream capable state and add a config.json file to resources o…
Browse files Browse the repository at this point in the history
…f strict encrypt

Enable per stream state for the last suite of strict encrypt tests

Final per stream state enable

Add stream capable state to MysqlDatatype

Enable it for mysqlssl
  • Loading branch information
nguyenaiden committed Sep 21, 2023
1 parent 89ba1b9 commit 9aa88a6
Showing 9 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.MySqlUtils;
@@ -25,7 +26,7 @@ public abstract class AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTe

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
addTestData(container);
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
@@ -30,10 +31,17 @@
import java.util.HashMap;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class MySqlStrictEncryptSourceAcceptanceTest extends SourceAcceptanceTest {

@SystemStub
public EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

@@ -42,6 +50,7 @@ public class MySqlStrictEncryptSourceAcceptanceTest extends SourceAcceptanceTest

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"host": "default",
"port": 5555,
"database": "default",
"username": "default",
"replication_method": { "method": "STANDARD" }
}
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.mysql.cursor_based.MySqlCursorBasedStateManager;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.mysql.initialsync.MySqlFeatureFlags;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadHandler;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStreamStateManager;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil;
@@ -349,7 +348,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final Instant emittedAt) {

final JsonNode sourceConfig = database.getSourceConfig();
final MySqlFeatureFlags mySqlFeatureFlags = new MySqlFeatureFlags(sourceConfig);
if (isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) {
LOGGER.info("Using PK + CDC");
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ public class MySqlFeatureFlags {
public MySqlFeatureFlags(final JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}

private boolean getFlagValue(final String flag) {
return sourceConfig.has(flag) && sourceConfig.get(flag).asBoolean();
}
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.MySqlUtils;
@@ -26,7 +27,7 @@ public abstract class AbstractMySqlSslCertificateSourceAcceptanceTest extends My

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
addTestData(container);
Original file line number Diff line number Diff line change
@@ -38,8 +38,7 @@
public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {

@SystemStub
private EnvironmentVariables environmentVariables;

public EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
@@ -15,9 +16,16 @@
import io.airbyte.integrations.util.HostPortResolver;
import java.util.Map;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class MySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest {
@SystemStub
private EnvironmentVariables environmentVariables;

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
@@ -26,6 +34,7 @@ protected void tearDown(final TestDestinationEnv testEnv) {

@Override
protected Database setupDatabase() throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
@@ -21,6 +22,7 @@ public class MySqlSslSourceAcceptanceTest extends MySqlSourceAcceptanceTest {

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()

0 comments on commit 9aa88a6

Please sign in to comment.