diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 34104bfbe..2ddb6377b 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/cli +## 1.8.5 + +### Patch Changes + +Support Kafka trigger type in CLI + +- Updated dependencies [7c96d79] + - @openfn/deploy@0.8.0 + ## 1.8.4 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index c21c9ff6a..996d5250c 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.8.4", + "version": "1.8.5", "description": "CLI devtools for the openfn toolchain.", "engines": { "node": ">=18", diff --git a/packages/deploy/CHANGELOG.md b/packages/deploy/CHANGELOG.md index 87ce343bd..c30e41b6c 100644 --- a/packages/deploy/CHANGELOG.md +++ b/packages/deploy/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/deploy +## 0.8.0 + +### Minor Changes + +- 7c96d79: Support Kafka trigger type in CLI + ## 0.7.1 ### Patch Changes diff --git a/packages/deploy/package.json b/packages/deploy/package.json index 4b8f8fcd7..168f49557 100644 --- a/packages/deploy/package.json +++ b/packages/deploy/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/deploy", - "version": "0.7.1", + "version": "0.8.0", "description": "Deploy projects to Lightning instances", "type": "module", "exports": { diff --git a/packages/deploy/src/stateTransform.ts b/packages/deploy/src/stateTransform.ts index 9f609ae8a..4b89a31b2 100644 --- a/packages/deploy/src/stateTransform.ts +++ b/packages/deploy/src/stateTransform.ts @@ -8,6 +8,7 @@ import { SpecEdge, SpecJobBody, StateEdge, + StateKafkaHost, WorkflowSpec, WorkflowState, } from './types'; @@ -30,6 +31,24 @@ function stringifyJobBody(body: SpecJobBody): string { } } +function transformSpecKafkaHost(hosts: string[] = []): StateKafkaHost[] { + function isValidHost(value: string): boolean { + const regex = /^[^:]+:\d+$/; + return regex.test(value); + } + + return hosts.map((host) => { + if (!isValidHost(host)) { + throw new DeployError( + `Kafka host must be specified in the format host:port, found: ${host}`, + 'VALIDATION_ERROR' + ); + } + const [hostname, port] = host.split(':'); + return [hostname, port]; + }); +} + function getStateJobCredential( specJobCredential: string, stateCredentials: ProjectState['project_credentials'] @@ -122,16 +141,25 @@ function mergeTriggers( splitZip(stateTriggers, specTriggers!).map( ([triggerKey, stateTrigger, specTrigger]) => { if (specTrigger && !stateTrigger) { - return [ - triggerKey, - { - id: crypto.randomUUID(), - ...pickKeys(specTrigger, ['type', 'enabled']), - ...(specTrigger.type === 'cron' - ? { cron_expression: specTrigger.cron_expression } - : {}), - }, - ]; + const trigger: any = { + id: crypto.randomUUID(), + ...pickKeys(specTrigger, ['type', 'enabled']), + }; + + if (specTrigger.type === 'cron') { + trigger.cron_expression = specTrigger.cron_expression; + } + + if (specTrigger.type === 'kafka') { + trigger.kafka_configuration = { + ...specTrigger.kafka_configuration, + hosts: transformSpecKafkaHost( + specTrigger.kafka_configuration?.hosts + ), + }; + } + + return [triggerKey, trigger]; } if (!specTrigger && stateTrigger) { @@ -139,19 +167,26 @@ function mergeTriggers( } // prefer spec, but use state if spec is missing, or default - return [ - triggerKey, - { - id: stateTrigger!.id, - ...{ - type: pickValue(specTrigger!, stateTrigger!, 'type', 'webhook'), - enabled: pickValue(specTrigger!, stateTrigger!, 'enabled', true), - ...(specTrigger!.type === 'cron' - ? { cron_expression: specTrigger!.cron_expression } - : {}), - }, - }, - ]; + const trigger: any = { + id: stateTrigger!.id, + type: pickValue(specTrigger!, stateTrigger!, 'type', 'webhook'), + enabled: pickValue(specTrigger!, stateTrigger!, 'enabled', true), + }; + + if (specTrigger!.type === 'cron') { + trigger.cron_expression = specTrigger!.cron_expression; + } + + if (specTrigger!.type === 'kafka') { + trigger.kafka_configuration = { + ...specTrigger!.kafka_configuration, + hosts: transformSpecKafkaHost( + specTrigger!.kafka_configuration?.hosts ?? [] + ), + }; + } + + return [triggerKey, trigger]; } ) ); diff --git a/packages/deploy/src/types.ts b/packages/deploy/src/types.ts index d721541e3..90f801bf3 100644 --- a/packages/deploy/src/types.ts +++ b/packages/deploy/src/types.ts @@ -22,12 +22,36 @@ export type SpecJob = { credential: string | null; }; -export type Trigger = { - id?: string; - type?: string; +export type StateKafkaHost = [string, string]; + +export type StateKafkaConfiguration = { + hosts: StateKafkaHost[]; + topics: string[]; + initial_offset_reset_policy: string; + connect_timeout: number; +}; + +export type SpecKafkaConfiguration = { + hosts: string[]; + topics: string[]; + initial_offset_reset_policy: string; + connect_timeout: number; +}; + +export type SpecTrigger = { + type: string; + cron_expression?: string; + enabled?: boolean; + kafka_configuration?: SpecKafkaConfiguration; +}; + +export type StateTrigger = { + id: string; + type: string; cron_expression?: string; delete?: boolean; enabled?: boolean; + kafka_configuration?: StateKafkaConfiguration; }; export type StateEdge = { @@ -55,7 +79,7 @@ export type WorkflowSpec = { id?: string; name: string; jobs?: Record; - triggers?: Record; + triggers?: Record; edges?: Record; }; @@ -81,7 +105,7 @@ export interface WorkflowState { id: string; name: string; jobs: Record>; - triggers: Record>; + triggers: Record>; edges: Record>; delete?: boolean; project_id?: string; @@ -109,7 +133,7 @@ export interface ProjectPayload { name: string; project_id?: string; jobs: Concrete[]; - triggers: Concrete[]; + triggers: Concrete[]; edges: Concrete[]; }[]; } diff --git a/packages/deploy/test/stateTransform.test.ts b/packages/deploy/test/stateTransform.test.ts index 760813257..a82810695 100644 --- a/packages/deploy/test/stateTransform.test.ts +++ b/packages/deploy/test/stateTransform.test.ts @@ -352,6 +352,85 @@ test('toNextState removing a job and edge', (t) => { t.deepEqual(result, existingState); }); +test('toNextState with for kafka trigger', (t) => { + const state = { workflows: {} }; + const spec = { + name: 'my project', + description: 'for the humans', + workflows: { + dummyWorkflow: { + name: 'workflow one', + jobs: { + 'new-job': { + name: 'new job', + adaptor: '@openfn/language-adaptor', + body: 'foo()', + }, + }, + triggers: { + kafka: { + type: 'kafka', + enabled: true, + kafka_configuration: { + hosts: ['localhost:9092'], + topics: ['test'], + connect_timeout: 30, + initial_offset_reset_policy: 'earliest', + }, + }, + }, + edges: {}, + }, + }, + }; + + let result = mergeSpecIntoState(state, spec); + + let expectedHosts = [['localhost', '9092']]; + + t.deepEqual( + result.workflows.dummyWorkflow.triggers.kafka.kafka_configuration.hosts, + expectedHosts + ); + + // deploy error is raised when host is incorrect + const badHosts = ['localhost', 'http://localhost:9092']; + badHosts.forEach((badHost) => { + const badSpec = { + name: 'my project', + description: 'for the humans', + workflows: { + dummyWorkflow: { + name: 'workflow one', + jobs: {}, + edges: {}, + triggers: { + kafka: { + type: 'kafka', + enabled: true, + kafka_configuration: { + hosts: [badHost], + topics: ['test'], + connect_timeout: 30, + initial_offset_reset_policy: 'earliest', + }, + }, + }, + }, + }, + }; + + t.throws( + () => { + mergeSpecIntoState({ workflows: {} }, badSpec); + }, + { + message: `Kafka host must be specified in the format host:port, found: ${badHost}`, + } + ); + }); +}); + test('mergeProjectIntoState with no changes', (t) => { let existingState = fullExampleState(); const workflowOne = existingState.workflows['workflow-one'];