Skip to content

Commit

Permalink
feat: add the signal input to the workload table (#14006)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Sep 20, 2024
1 parent 8713c9a commit 3d6fe87
Show file tree
Hide file tree
Showing 24 changed files with 102 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ components:
autoId:
type: string
format: uuid
signalInput:
type: string
nullable: true
WorkloadCancelRequest:
required:
- reason
Expand Down Expand Up @@ -478,6 +481,9 @@ components:
nullable: true
priority:
$ref: "#/components/schemas/WorkloadPriority"
signalInput:
type: string
nullable: true
WorkloadFailureRequest:
required:
- workloadId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.58.00.001";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.64.4.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.64.7.001";

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public String createWorkload(final ReplicationInput replicationInput, final Path
WorkloadType.SYNC,
WorkloadPriority.DEFAULT,
replicationInput.getConnectionId().toString(),
null);
null,
replicationInput.getSignalInput() == null ? null : Jsons.serialize(replicationInput.getSignalInput()));

// Create the workload
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ internal class WorkloadApiWorkerTest {
this.workspaceId = workspaceId
this.connectionId = connectionId
this.jobRunConfig = jobRunConfig
this.signalInput = "signalInputValue"
}

replicationActivityInput.apply {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.airbyte.config

data class SignalInput(
val workflowType: String,
val workflowId: String,
val taskQueue: String,
) {
companion object {
const val SYNC_WORKFLOW = "sync"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: update migration description in the class name
public class V0_64_4_001__AddFinalizationInputToWorkload extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_64_4_001__AddFinalizationInputToWorkload.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());

final Field<String> finalizationInputField = DSL.field("signal_input", SQLDataType.CLOB.nullable(true));
ctx.alterTable("workload")
.addColumnIfNotExists(finalizationInputField)
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ create table "public"."workload" (
"termination_reason" text,
"auto_id" uuid,
"deadline" timestamp(6) with time zone,
"signal_input" text,
constraint "workload_pkey" primary key ("id")
);
create table "public"."workload_label" (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,6 @@ properties:
description: whether the destination supports refreshes
type: boolean
default: false
signalInput:
description: Signal input for the sync
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public ConnectorJobOutput runWithWorkload(final CheckConnectionInput input) thro
WorkloadType.CHECK,
WorkloadPriority.Companion.decode(input.getLauncherConfig().getPriority().toString()),
null,
null,
null);

workloadClient.createWorkload(workloadCreateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public ConnectorJobOutput runWithWorkload(final DiscoverCatalogInput input) thro
WorkloadType.DISCOVER,
Objects.requireNonNull(WorkloadPriority.Companion.decode(input.getLauncherConfig().getPriority().toString())),
null,
null,
null);

workloadClient.createWorkload(workloadCreateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public ConnectorJobOutput runWithWorkload(final SpecInput input) throws WorkerEx
WorkloadType.SPEC,
WorkloadPriority.HIGH,
null,
null,
null);

workloadClient.createWorkload(workloadCreateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -96,8 +98,9 @@ void init() throws Exception {
when(logClientManager.fullLogPath(any())).then(i -> Path.of(i.getArguments()[0].toString(), DEFAULT_LOG_FILENAME).toString());
}

@Test
void testStartWithWorkload() throws Exception {
@ParameterizedTest
@EnumSource(ActorType.class)
void testStartWithWorkload(final ActorType actorType) throws Exception {
final CheckConnectionInput input = getCheckInput();

when(jobOutputDocStore.read(WORKLOAD_ID)).thenReturn(Optional.of(new ConnectorJobOutput()
Expand Down Expand Up @@ -156,7 +159,8 @@ private Workload getWorkloadWithStatus(WorkloadStatus status) {
null,
null,
null,
null);
null,
"");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.airbyte.workload.api.client.model.generated.WorkloadType
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -102,6 +102,7 @@ class DiscoverCatalogActivityTest {
every { workloadIdGenerator.generateDiscoverWorkloadId(actorDefinitionId, jobId, attemptNumber) }.returns(workloadId)
}
every { discoverCatalogActivity.getGeography(Optional.of(connectionId), Optional.of(workspaceId)) }.returns(Geography.AUTO)

every { workloadApi.workloadCreate(any()) }.returns(Unit)
every {
workloadApi.workloadGet(workloadId)
Expand All @@ -111,6 +112,6 @@ class DiscoverCatalogActivityTest {
.withDiscoverCatalogId(UUID.randomUUID())
every { jobOutputDocStore.read(workloadId) }.returns(Optional.of(output))
val actualOutput = discoverCatalogActivity.runWithWorkload(input)
Assertions.assertEquals(output, actualOutput)
assertEquals(output, actualOutput)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ open class WorkloadApi(
workloadCreateRequest.type,
autoId,
workloadCreateRequest.deadline ?: defaultDeadlineValues.createStepDeadline(),
workloadCreateRequest.signalInput,
)
workloadService.create(
workloadId = workloadCreateRequest.workloadId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ data class Workload(
// This is an uniq ID allowing to identify a workload. It is needed in addition of the workloadId to be able to add
// this identifier to the kube pod label.
var autoId: UUID = UUID.randomUUID(),
var signalInput: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ data class WorkloadCreateRequest(
var type: WorkloadType = WorkloadType.SYNC,
var deadline: OffsetDateTime? = null,
var priority: WorkloadPriority = WorkloadPriority.HIGH,
var signalInput: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ interface WorkloadHandler {
type: WorkloadType,
autoId: UUID,
deadline: OffsetDateTime,
signalInput: String?,
)

fun claimWorkload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class WorkloadHandlerImpl(
type: WorkloadType,
autoId: UUID,
deadline: OffsetDateTime,
signalInput: String?,
) {
val workloadAlreadyExists = workloadRepository.existsById(workloadId)
if (workloadAlreadyExists) {
Expand All @@ -86,6 +87,7 @@ class WorkloadHandlerImpl(
type = type.toDomain(),
autoId = autoId,
deadline = deadline,
signalInput = signalInput,
)
workloadRepository.save(domainWorkload).toApi()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ fun DomainWorkload.toApi(): ApiWorkload {
terminationReason = this.terminationReason,
terminationSource = this.terminationSource,
autoId = if (this.autoId == null) UUID(0, 0) else this.autoId!!,
signalInput = this.signalInput,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ data class Workload(
var deadline: OffsetDateTime? = null,
@Nullable
var autoId: UUID? = null,
@Nullable
var signalInput: String? = null,
) {
@VisibleForTesting
constructor(
Expand All @@ -66,6 +68,7 @@ data class Workload(
geography: String,
mutexKey: String,
type: WorkloadType,
signalInput: String,
) : this(
id = id,
dataplaneId = dataplaneId,
Expand All @@ -82,6 +85,7 @@ data class Workload(
terminationSource = null,
terminationReason = null,
autoId = UUID.randomUUID(),
signalInput = signalInput,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class WorkloadApiTest(
@Test
fun `test create success`() {
every { workloadHandler.workloadAlreadyExists(any()) } returns false
every { workloadHandler.createWorkload(any(), any(), any(), any(), any(), any(), any(), any(), any()) } just Runs
every { workloadHandler.createWorkload(any(), any(), any(), any(), any(), any(), any(), any(), any(), any()) } just Runs
every { workloadService.create(any(), any(), any(), any(), any(), any(), any(), any(), any()) } just Runs
testEndpointStatus(HttpRequest.POST("/api/v1/workload/create", Jsons.serialize(WorkloadCreateRequest())), HttpStatus.NO_CONTENT)
verify(exactly = 1) { workloadHandler.createWorkload(any(), any(), any(), any(), any(), any(), any(), any(), any()) }
verify(exactly = 1) { workloadHandler.createWorkload(any(), any(), any(), any(), any(), any(), any(), any(), any(), any()) }
verify(exactly = 1) { workloadService.create(any(), any(), any(), any(), any(), any(), any(), any(), any()) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class WorkloadHandlerImplTest {
mutexKey = null,
type = WorkloadType.SYNC,
autoId = UUID.randomUUID(),
signalInput = "",
)

every { workloadRepository.findById(WORKLOAD_ID) }.returns(Optional.of(domainWorkload))
Expand Down Expand Up @@ -99,6 +100,7 @@ class WorkloadHandlerImplTest {
io.airbyte.config.WorkloadType.SYNC,
UUID.randomUUID(),
now.plusHours(2),
signalInput = "signal payload",
)
verify {
workloadRepository.save(
Expand All @@ -116,7 +118,8 @@ class WorkloadHandlerImplTest {
it.geography == "US" &&
it.mutexKey == "mutex-this" &&
it.type == WorkloadType.SYNC &&
it.deadline!!.equals(now.plusHours(2))
it.deadline!!.equals(now.plusHours(2)) &&
it.signalInput == "signal payload"
},
)
}
Expand All @@ -126,7 +129,7 @@ class WorkloadHandlerImplTest {
fun `test create workload id conflict`() {
every { workloadRepository.existsById(WORKLOAD_ID) }.returns(true)
assertThrows<ConflictException> {
workloadHandler.createWorkload(WORKLOAD_ID, null, "", "", "US", "mutex-this", io.airbyte.config.WorkloadType.SYNC, UUID.randomUUID(), now)
workloadHandler.createWorkload(WORKLOAD_ID, null, "", "", "US", "mutex-this", io.airbyte.config.WorkloadType.SYNC, UUID.randomUUID(), now, "")
}
}

Expand Down Expand Up @@ -155,7 +158,7 @@ class WorkloadHandlerImplTest {
)
}.returns(duplWorkloads + listOf(newWorkload))

workloadHandler.createWorkload(WORKLOAD_ID, null, "", "", "US", "mutex-this", io.airbyte.config.WorkloadType.SYNC, UUID.randomUUID(), now)
workloadHandler.createWorkload(WORKLOAD_ID, null, "", "", "US", "mutex-this", io.airbyte.config.WorkloadType.SYNC, UUID.randomUUID(), now, "")
verify {
workloadHandler.failWorkload(workloadIdWithFailedFail, any(), any())
workloadHandler.failWorkload(workloadIdWithSuccessfulFail, any(), any())
Expand Down Expand Up @@ -665,6 +668,7 @@ class WorkloadHandlerImplTest {
mutexKey: String = "",
type: WorkloadType = WorkloadType.SYNC,
createdAt: OffsetDateTime = OffsetDateTime.now(),
signalPayload: String = "",
): Workload =
Workload(
id = id,
Expand All @@ -677,6 +681,7 @@ class WorkloadHandlerImplTest {
mutexKey = mutexKey,
type = type,
createdAt = createdAt,
signalInput = signalPayload,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class WorkloadMapperKtTest {
terminationSource = "terminationSource",
deadline = Instant.now().atOffset(ZoneOffset.UTC),
autoId = UUID.randomUUID(),
signalInput = "signalPayload",
)

val apiWorkload = domainWorkload.toApi()
Expand All @@ -51,6 +52,7 @@ class WorkloadMapperKtTest {
assertEquals(domainWorkload.terminationReason, apiWorkload.terminationReason)
assertEquals(domainWorkload.terminationSource, apiWorkload.terminationSource)
assertEquals(domainWorkload.autoId, apiWorkload.autoId)
assertEquals(domainWorkload.signalInput, apiWorkload.signalInput)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ internal class WorkloadRepositoryTest {
val labels = ArrayList<WorkloadLabel>()
labels.add(label1)
labels.add(label2)
val signalInput = "signalInput"
val workload =
Fixtures.workload(
id = WORKLOAD_ID,
dataplaneId = null,
status = WorkloadStatus.PENDING,
workloadLabels = labels,
deadline = defaultDeadline,
signalInput = signalInput,
)
workloadRepo.save(workload)
val persistedWorkload = workloadRepo.findById(WORKLOAD_ID)
Expand All @@ -158,6 +160,7 @@ internal class WorkloadRepositoryTest {
assertNotNull(persistedWorkload.get().deadline)
assertEquals(defaultDeadline.toEpochSecond(), persistedWorkload.get().deadline!!.toEpochSecond())
assertEquals(2, persistedWorkload.get().workloadLabels!!.size)
assertEquals(signalInput, persistedWorkload.get().signalInput)

val workloadLabels = persistedWorkload.get().workloadLabels!!.toMutableList()
workloadLabels.sortWith(Comparator.comparing(WorkloadLabel::key))
Expand Down Expand Up @@ -474,6 +477,7 @@ internal class WorkloadRepositoryTest {
mutexKey: String = "",
type: WorkloadType = WorkloadType.SYNC,
deadline: OffsetDateTime = OffsetDateTime.now(),
signalInput: String = "",
): Workload =
Workload(
id = id,
Expand All @@ -486,6 +490,7 @@ internal class WorkloadRepositoryTest {
mutexKey = mutexKey,
type = type,
deadline = deadline,
signalInput = signalInput,
)
}
}

0 comments on commit 3d6fe87

Please sign in to comment.