Skip to content

Commit

Permalink
chore: refactor workload acceptance tests to use dynamic feature flag…
Browse files Browse the repository at this point in the history
…s (#13492)
  • Loading branch information
gosusnp committed Sep 17, 2024
1 parent 0993ad0 commit f5b500b
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 28 deletions.
1 change: 1 addition & 0 deletions airbyte-featureflag-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation(libs.jackson.databind)
implementation(libs.jackson.dataformat)
implementation(libs.jackson.kotlin)
implementation(libs.kotlin.logging)

implementation(project(":oss:airbyte-commons"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package io.airbyte.featureflag.server
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.airbyte.commons.json.Jsons
import io.airbyte.featureflag.server.model.Context
import io.airbyte.featureflag.server.model.FeatureFlag
import io.airbyte.featureflag.server.model.Rule
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Property
import jakarta.inject.Singleton
import java.nio.file.Path
import kotlin.io.path.exists
import kotlin.io.path.isRegularFile

private val logger = KotlinLogging.logger {}

// This is open for testing, creating an interface might be the way to go
@Singleton
open class FeatureFlagService(
Expand All @@ -29,6 +33,7 @@ open class FeatureFlagService(
}
}
}
logger.info { "FeatureFlagService loaded with ${flags.toPrettyJson()}" }
}

open fun delete(key: String) {
Expand All @@ -38,6 +43,15 @@ open class FeatureFlagService(
open fun eval(
key: String,
context: Map<String, String>,
): String? {
val result = doEval(key, context)
logger.debug { "Evaluating $key with $context to $result" }
return result
}

private fun doEval(
key: String,
context: Map<String, String>,
): String? {
val flag = flags[key] ?: return null
for (rule in flag.rules) {
Expand All @@ -62,6 +76,7 @@ open class FeatureFlagService(
throw Exception("$key already has a rule for context ${rule.context}")
}
flag.rules.add(rule.toMutableRule())
logger.debug { "Updated $key to $flag" }
return flag.toFeatureFlag()
}

Expand All @@ -74,6 +89,7 @@ open class FeatureFlagService(
.find { it.context == rule.context }
?.apply { value = rule.value }
?: throw Exception("$key does not have a rule for context ${rule.context}")
logger.debug { "Updated $key to $flag" }
return flag.toFeatureFlag()
}

Expand All @@ -83,11 +99,13 @@ open class FeatureFlagService(
): FeatureFlag {
val flag = flags[key] ?: throw Exception("$key not found")
flag.rules.removeIf { it.context == context }
logger.debug { "Updated $key to $flag" }
return flag.toFeatureFlag()
}

open fun put(flag: FeatureFlag): FeatureFlag {
flags[flag.key] = flag.toMutableFeatureFlag()
logger.debug { "Updated ${flag.key} to $flag" }
return get(flag.key) ?: throw Exception("Failed to put flag $flag")
}

Expand All @@ -100,6 +118,8 @@ open class FeatureFlagService(
return put(flag)
}

private fun <T : Any> T.toPrettyJson(): String = Jsons.toPrettyString(Jsons.jsonNode(this))

private fun Context.matches(env: Map<String, String>): Boolean = env[kind] == value

private fun MutableFeatureFlag.toFeatureFlag(): FeatureFlag = FeatureFlag(key = key, default = default, rules = rules.map { it.toRule() }.toList())
Expand Down
104 changes: 83 additions & 21 deletions airbyte-featureflag/src/main/kotlin/tests/TestFlagsSetter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody

class TestFlagsSetter {
private val baseurl = "http://local.airbyte.dev/api/v1/feature-flags"
class TestFlagsSetter(baseUrl: String) {
private val basePath = "/api/v1/feature-flags"
private val httpClient = OkHttpClient().newBuilder().build()
private val urlPrefix = if (baseUrl.endsWith("/")) "${baseUrl.trimEnd('/')}$basePath" else "$baseUrl$basePath"

class FlagOverride<T>(
private val flag: Flag<T>,
context: Context,
context: Context? = null,
value: T,
private val testFlags: TestFlagsSetter,
) : AutoCloseable {
Expand All @@ -27,61 +28,122 @@ class TestFlagsSetter {
}
}

class FlagRuleOverride<T>(
private val flag: Flag<T>,
private val context: Context,
private val value: T,
private val testFlags: TestFlagsSetter,
) : AutoCloseable {
init {
testFlags.setRule(flag, context, value)
}

override fun close() {
testFlags.deleteRule(flag, context)
}
}

fun <T> withFlag(
flag: Flag<T>,
context: Context,
value: T,
context: Context? = null,
) = FlagOverride(flag, context, value, this)

fun <T> deleteFlag(flag: Flag<T>) {
httpClient.newCall(
Request.Builder()
.url("$baseurl/${flag.key}")
.url("$urlPrefix/${flag.key}")
.delete()
.build(),
).execute()
}

fun <T> setFlag(
fun <T> withRule(
flag: Flag<T>,
context: Context,
value: T,
) = FlagRuleOverride(flag, context, value, this)

fun <T> setFlag(
flag: Flag<T>,
context: Context? = null,
value: T,
) {
val requestFlag =
ApiFeatureFlag(
key = flag.key,
default = flag.default.toString(),
rules =
listOf(
ApiRule(
context = ApiContext(kind = context.kind, value = context.key),
value = value.toString(),
),
),
if (context != null) {
listOf(
ApiRule(
context = ApiContext(kind = context.kind, value = context.key),
value = value.toString(),
),
)
} else {
emptyList()
},
)
httpClient.newCall(
val response =
httpClient.newCall(
Request.Builder()
.url(urlPrefix)
.put(Jsons.serialize(requestFlag).toRequestBody("application/json".toMediaType()))
.build(),
).execute()
assert(response.code == 200, { "Failed to update the feature flag ${requestFlag.key}, error: ${response.code}: ${response.body?.string()}" })
}

fun <T> getFlag(flag: Flag<T>): String? {
return httpClient.newCall(
Request.Builder()
.url(baseurl)
.put(Jsons.serialize(requestFlag).toRequestBody("application/json".toMediaType()))
.url("$urlPrefix/${flag.key}")
.build(),
).execute()
.body?.string()
}

fun <T> getFlag(flag: Flag<T>) {
httpClient.newCall(
fun <T> evalFlag(
flag: Flag<T>,
context: Context,
): String? {
return httpClient.newCall(
Request.Builder()
.url("$baseurl/${flag.key}")
.url("$urlPrefix/${flag.key}/evaluate?kind=${context.kind}&value=${context.key}")
.build(),
).execute()
).execute().body?.string()
}

fun <T> evalFlag(
fun <T> setRule(
flag: Flag<T>,
context: Context,
value: T,
) {
val requestRule =
ApiRule(
context = ApiContext(kind = context.kind, value = context.key),
value = value.toString(),
)
val response =
httpClient.newCall(
Request.Builder()
.url("$urlPrefix/${flag.key}/rules")
.post(Jsons.serialize(requestRule).toRequestBody("application/json".toMediaType()))
.build(),
).execute()
assert(response.code == 200, { "Failed to update the feature flag ${flag.key}, error: ${response.code}: ${response.body?.string()}" })
}

fun <T> deleteRule(
flag: Flag<T>,
context: Context,
) {
val requestContext = ApiContext(kind = context.kind, value = context.key)
httpClient.newCall(
Request.Builder()
.url("$baseurl/${flag.key}/evaluate?kind=${context.kind}&value=${context.key}")
.url("$urlPrefix/${flag.key}/rules")
.delete(Jsons.serialize(requestContext).toRequestBody("application/json".toMediaType()))
.build(),
).execute()
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-test-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation(project(":oss:airbyte-commons-storage"))
implementation(project(":oss:airbyte-commons-temporal"))
implementation(project(":oss:airbyte-commons-worker"))
implementation(project(":oss:airbyte-featureflag"))
implementation(libs.bundles.kubernetes.client)
implementation(libs.bundles.flyway)
implementation(libs.temporal.sdk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import io.airbyte.db.Database;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.featureflag.tests.TestFlagsSetter;
import io.airbyte.test.container.AirbyteTestContainer;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -226,6 +227,7 @@ public class AcceptanceTestHarness {

private AirbyteTestContainer airbyteTestContainer;
private final AirbyteApiClient apiClient;
private final TestFlagsSetter testFlagsSetter;
private final UUID defaultWorkspaceId;
private final String postgresSqlInitFile;

Expand Down Expand Up @@ -253,10 +255,19 @@ public void removeConnection(final UUID connection) {
public AcceptanceTestHarness(final AirbyteApiClient apiClient,
final UUID defaultWorkspaceId,
final String postgresSqlInitFile)
throws GeneralSecurityException, URISyntaxException, IOException, InterruptedException {
this(apiClient, null, defaultWorkspaceId, postgresSqlInitFile);
}

public AcceptanceTestHarness(final AirbyteApiClient apiClient,
final TestFlagsSetter testFlagsSetter,
final UUID defaultWorkspaceId,
final String postgresSqlInitFile)
throws URISyntaxException, IOException, InterruptedException, GeneralSecurityException {
// reads env vars to assign static variables
assignEnvVars();
this.apiClient = apiClient;
this.testFlagsSetter = testFlagsSetter;
this.defaultWorkspaceId = defaultWorkspaceId;
this.postgresSqlInitFile = postgresSqlInitFile;

Expand Down Expand Up @@ -325,6 +336,19 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID defaul
this(apiClient, defaultWorkspaceId, DEFAULT_POSTGRES_INIT_SQL_FILE);
}

public AcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID defaultWorkspaceId, final TestFlagsSetter testFlagsSetter)
throws GeneralSecurityException, URISyntaxException, IOException, InterruptedException {
this(apiClient, testFlagsSetter, defaultWorkspaceId, DEFAULT_POSTGRES_INIT_SQL_FILE);
}

public AirbyteApiClient getApiClient() {
return apiClient;
}

public TestFlagsSetter getTestFlagsSetter() {
return testFlagsSetter;
}

public void stopDbAndContainers() {
if (isGke) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.api.client.model.generated.WorkspaceCreate;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.featureflag.tests.TestFlagsSetter;
import io.airbyte.test.utils.AcceptanceTestHarness;
import io.airbyte.test.utils.Asserts;
import io.airbyte.test.utils.TestConnectionCreate.Builder;
Expand Down Expand Up @@ -240,8 +241,10 @@ void runIncrementalSyncForAWorkspaceId(final UUID workspaceId) throws Exception
StreamStatusJobType.SYNC);
}

void runSmallSyncForAWorkspaceId(final UUID workspaceId) throws Exception {
LOGGER.info("Starting runSmallSyncForAWorkspaceId()");
record SyncIds(UUID connectionId, Long jobId, Integer attemptNumber) {}

SyncIds runSmallSyncForAWorkspaceId(final UUID workspaceId) throws Exception {
LOGGER.info("Starting runSmallSyncForAWorkspaceId(" + workspaceId + ")");
final UUID sourceId = testHarness.createPostgresSource(workspaceId).getSourceId();
final UUID destinationId = testHarness.createPostgresDestination(workspaceId).getDestinationId();
final SourceDiscoverSchemaRead discoverResult = testHarness.discoverSourceSchemaWithId(sourceId);
Expand Down Expand Up @@ -289,12 +292,15 @@ void runSmallSyncForAWorkspaceId(final UUID workspaceId) throws Exception {
WITHOUT_SCD_TABLE);
Asserts.assertStreamStatuses(testHarness, workspaceId, connectionId, connectionSyncRead1.getJob().getId(), StreamStatusRunState.COMPLETE,
StreamStatusJobType.SYNC);

return new SyncIds(connectionId, connectionSyncRead1.getJob().getId(), connectionSyncRead1.getAttempts().size() - 1);
}

void init() throws URISyntaxException, IOException, InterruptedException, GeneralSecurityException {
final AirbyteApiClient airbyteApiClient =
createAirbyteApiClient(AIRBYTE_SERVER_HOST + "/api",
Map.of(GATEWAY_AUTH_HEADER, CLOUD_API_USER_HEADER_VALUE));
final TestFlagsSetter testFlagsSetter = new TestFlagsSetter(AIRBYTE_SERVER_HOST);

// If a workspace id is passed, use that. Otherwise, create a new workspace.
// NOTE: we want to sometimes use a pre-configured workspace e.g., if we run against a production
Expand Down Expand Up @@ -326,7 +332,7 @@ void init() throws URISyntaxException, IOException, InterruptedException, Genera
LOGGER.info("pg source definition: {}", sourceDef.getDockerImageTag());
LOGGER.info("pg destination definition: {}", destinationDef.getDockerImageTag());

testHarness = new AcceptanceTestHarness(airbyteApiClient, workspaceId);
testHarness = new AcceptanceTestHarness(airbyteApiClient, workspaceId, testFlagsSetter);

testHarness.ensureCleanSlate();
}
Expand Down
Loading

0 comments on commit f5b500b

Please sign in to comment.