Skip to content

Commit

Permalink
CLI: Support kafka trigger type (#795)
Browse files Browse the repository at this point in the history
* add kafka trigger types

* put kafka config keys under kafka_configuration as it is in lightning

* update changeset

* refactor and test

* fix failing tests

* use any

* version: [email protected]

---------

Co-authored-by: Joe Clark <[email protected]>
  • Loading branch information
midigofrank and josephjclark authored Oct 16, 2024
1 parent 15763ed commit 591bcc8
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 31 deletions.
9 changes: 9 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/cli

## 1.8.5

### Patch Changes

Support Kafka trigger type in CLI

- Updated dependencies [7c96d79]
- @openfn/deploy@0.8.0

## 1.8.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.8.4",
"version": "1.8.5",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
6 changes: 6 additions & 0 deletions packages/deploy/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/deploy

## 0.8.0

### Minor Changes

- 7c96d79: Support Kafka trigger type in CLI

## 0.7.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/deploy/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/deploy",
"version": "0.7.1",
"version": "0.8.0",
"description": "Deploy projects to Lightning instances",
"type": "module",
"exports": {
Expand Down
81 changes: 58 additions & 23 deletions packages/deploy/src/stateTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
SpecEdge,
SpecJobBody,
StateEdge,
StateKafkaHost,
WorkflowSpec,
WorkflowState,
} from './types';
Expand All @@ -30,6 +31,24 @@ function stringifyJobBody(body: SpecJobBody): string {
}
}

function transformSpecKafkaHost(hosts: string[] = []): StateKafkaHost[] {
function isValidHost(value: string): boolean {
const regex = /^[^:]+:\d+$/;
return regex.test(value);
}

return hosts.map((host) => {
if (!isValidHost(host)) {
throw new DeployError(
`Kafka host must be specified in the format host:port, found: ${host}`,
'VALIDATION_ERROR'
);
}
const [hostname, port] = host.split(':');
return [hostname, port];
});
}

function getStateJobCredential(
specJobCredential: string,
stateCredentials: ProjectState['project_credentials']
Expand Down Expand Up @@ -122,36 +141,52 @@ function mergeTriggers(
splitZip(stateTriggers, specTriggers!).map(
([triggerKey, stateTrigger, specTrigger]) => {
if (specTrigger && !stateTrigger) {
return [
triggerKey,
{
id: crypto.randomUUID(),
...pickKeys(specTrigger, ['type', 'enabled']),
...(specTrigger.type === 'cron'
? { cron_expression: specTrigger.cron_expression }
: {}),
},
];
const trigger: any = {
id: crypto.randomUUID(),
...pickKeys(specTrigger, ['type', 'enabled']),
};

if (specTrigger.type === 'cron') {
trigger.cron_expression = specTrigger.cron_expression;
}

if (specTrigger.type === 'kafka') {
trigger.kafka_configuration = {
...specTrigger.kafka_configuration,
hosts: transformSpecKafkaHost(
specTrigger.kafka_configuration?.hosts
),
};
}

return [triggerKey, trigger];
}

if (!specTrigger && stateTrigger) {
return [triggerKey, { id: stateTrigger!.id, delete: true }];
}

// prefer spec, but use state if spec is missing, or default
return [
triggerKey,
{
id: stateTrigger!.id,
...{
type: pickValue(specTrigger!, stateTrigger!, 'type', 'webhook'),
enabled: pickValue(specTrigger!, stateTrigger!, 'enabled', true),
...(specTrigger!.type === 'cron'
? { cron_expression: specTrigger!.cron_expression }
: {}),
},
},
];
const trigger: any = {
id: stateTrigger!.id,
type: pickValue(specTrigger!, stateTrigger!, 'type', 'webhook'),
enabled: pickValue(specTrigger!, stateTrigger!, 'enabled', true),
};

if (specTrigger!.type === 'cron') {
trigger.cron_expression = specTrigger!.cron_expression;
}

if (specTrigger!.type === 'kafka') {
trigger.kafka_configuration = {
...specTrigger!.kafka_configuration,
hosts: transformSpecKafkaHost(
specTrigger!.kafka_configuration?.hosts ?? []
),
};
}

return [triggerKey, trigger];
}
)
);
Expand Down
36 changes: 30 additions & 6 deletions packages/deploy/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,36 @@ export type SpecJob = {
credential: string | null;
};

export type Trigger = {
id?: string;
type?: string;
export type StateKafkaHost = [string, string];

export type StateKafkaConfiguration = {
hosts: StateKafkaHost[];
topics: string[];
initial_offset_reset_policy: string;
connect_timeout: number;
};

export type SpecKafkaConfiguration = {
hosts: string[];
topics: string[];
initial_offset_reset_policy: string;
connect_timeout: number;
};

export type SpecTrigger = {
type: string;
cron_expression?: string;
enabled?: boolean;
kafka_configuration?: SpecKafkaConfiguration;
};

export type StateTrigger = {
id: string;
type: string;
cron_expression?: string;
delete?: boolean;
enabled?: boolean;
kafka_configuration?: StateKafkaConfiguration;
};

export type StateEdge = {
Expand Down Expand Up @@ -55,7 +79,7 @@ export type WorkflowSpec = {
id?: string;
name: string;
jobs?: Record<string | symbol, SpecJob>;
triggers?: Record<string | symbol, Trigger>;
triggers?: Record<string | symbol, SpecTrigger>;
edges?: Record<string | symbol, SpecEdge>;
};

Expand All @@ -81,7 +105,7 @@ export interface WorkflowState {
id: string;
name: string;
jobs: Record<string | symbol, Concrete<StateJob>>;
triggers: Record<string | symbol, Concrete<Trigger>>;
triggers: Record<string | symbol, Concrete<StateTrigger>>;
edges: Record<string | symbol, Concrete<StateEdge>>;
delete?: boolean;
project_id?: string;
Expand Down Expand Up @@ -109,7 +133,7 @@ export interface ProjectPayload {
name: string;
project_id?: string;
jobs: Concrete<StateJob>[];
triggers: Concrete<Trigger>[];
triggers: Concrete<StateTrigger>[];
edges: Concrete<StateEdge>[];
}[];
}
Expand Down
79 changes: 79 additions & 0 deletions packages/deploy/test/stateTransform.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,85 @@ test('toNextState removing a job and edge', (t) => {
t.deepEqual(result, existingState);
});

test('toNextState with for kafka trigger', (t) => {
const state = { workflows: {} };
const spec = {
name: 'my project',
description: 'for the humans',
workflows: {
dummyWorkflow: {
name: 'workflow one',
jobs: {
'new-job': {
name: 'new job',
adaptor: '@openfn/language-adaptor',
body: 'foo()',
},
},
triggers: {
kafka: {
type: 'kafka',
enabled: true,
kafka_configuration: {
hosts: ['localhost:9092'],
topics: ['test'],
connect_timeout: 30,
initial_offset_reset_policy: 'earliest',
},
},
},
edges: {},
},
},
};

let result = mergeSpecIntoState(state, spec);

let expectedHosts = [['localhost', '9092']];

t.deepEqual(
result.workflows.dummyWorkflow.triggers.kafka.kafka_configuration.hosts,
expectedHosts
);

// deploy error is raised when host is incorrect
const badHosts = ['localhost', 'http://localhost:9092'];
badHosts.forEach((badHost) => {
const badSpec = {
name: 'my project',
description: 'for the humans',
workflows: {
dummyWorkflow: {
name: 'workflow one',
jobs: {},
edges: {},
triggers: {
kafka: {
type: 'kafka',
enabled: true,
kafka_configuration: {
hosts: [badHost],
topics: ['test'],
connect_timeout: 30,
initial_offset_reset_policy: 'earliest',
},
},
},
},
},
};

t.throws(
() => {
mergeSpecIntoState({ workflows: {} }, badSpec);
},
{
message: `Kafka host must be specified in the format host:port, found: ${badHost}`,
}
);
});
});

test('mergeProjectIntoState with no changes', (t) => {
let existingState = fullExampleState();
const workflowOne = existingState.workflows['workflow-one'];
Expand Down

0 comments on commit 591bcc8

Please sign in to comment.