From c9d22a94b6df66b1290eb48d7751921dc0d3c05d Mon Sep 17 00:00:00 2001 From: HugoCasa Date: Wed, 7 May 2025 18:56:16 +0200 Subject: [PATCH] feat: preprocessor new format --- docs/advanced/17_email_triggers/index.mdx | 15 +- docs/core_concepts/39_http_routing/index.mdx | 61 +- .../40_websocket_triggers/index.mdx | 18 +- .../core_concepts/41_kafka_triggers/index.mdx | 26 +- docs/core_concepts/43_preprocessors/index.mdx | 649 +++++++++++------- docs/core_concepts/45_nats_triggers/index.mdx | 32 +- docs/core_concepts/48_sqs_triggers/index.mdx | 54 +- docs/core_concepts/49_mqtt_triggers/index.mdx | 55 +- docs/core_concepts/50_gcp_triggers/index.mdx | 30 +- 9 files changed, 542 insertions(+), 398 deletions(-) diff --git a/docs/advanced/17_email_triggers/index.mdx b/docs/advanced/17_email_triggers/index.mdx index 4e1833cd..1568c9eb 100644 --- a/docs/advanced/17_email_triggers/index.mdx +++ b/docs/advanced/17_email_triggers/index.mdx @@ -64,16 +64,19 @@ And if you use a [preprocessor](../../core_concepts/43_preprocessors/index.mdx), ```TypeScript export async function preprocessor( - raw_email: string, - parsed_email: any, - wm_trigger: { + event: { kind: "email", - }, + raw_email: string, + parsed_email: any, + } ) { + if (event.kind !== "email") { + throw new Error("Expected an email event"); + } // return what you want to pass to the main function, for instance the sender email address and the email body return { - sender_address: parsed_email.headers["From"][0].address, - email_body: parsed_email.text_body + sender_address: event.parsed_email.headers["From"][0].address, + email_body: event.parsed_email.text_body } } diff --git a/docs/core_concepts/39_http_routing/index.mdx b/docs/core_concepts/39_http_routing/index.mdx index 6520d806..144c12a0 100644 --- a/docs/core_concepts/39_http_routing/index.mdx +++ b/docs/core_concepts/39_http_routing/index.mdx @@ -75,29 +75,31 @@ export async function main(/* args from the request body */) { With a preprocessor: ```ts export async function preprocessor( - name: string, - age: number, - wm_trigger: { - kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'gcp', - http: { - route: string; - path: string; - method: string; - params: Record; - query: Record; - headers: Record; - } + event: { + kind: 'http', + body: { // assuming the body contains name and age parameters + name: string, + age: number, + }, + raw_string: string | null, + route: string; + path: string; + method: string; + params: Record; + query: Record; + headers: Record; } ) { - if (wm_trigger.kind === 'http' && wm_trigger.http) { + if (event.kind === 'http') { + const { name, age } = event.body; return { - user_id: wm_trigger.http.params.id, + user_id: event.params.id, name, - age + age, }; } - throw new Error(`Expected trigger of kind 'http', but received: ${wm_trigger.kind}`); + throw new Error(`Expected trigger of kind 'http', but received: ${event.kind}`); } export async function main(user_id: string, name: string, age: number) { @@ -204,29 +206,28 @@ Example script for HMAC signature validation: const SECRET_KEY_VARIABLE_PATH = "u/admin/well_backlit_variable"; export async function main( - wm_trigger: { + event: { kind: 'http', - http?: { - route: string; - path: string; - method: string; - params: Record; - query: Record; - headers: Record; - }; + body: any + raw_string: string, + route: string; + path: string; + method: string; + params: Record; + query: Record; + headers: Record; }, - raw_string: string ) { - if (!wm_trigger.http) { - throw new Error('Missing HTTP context'); + if (event.kind !== 'http') { + throw new Error('Expected a http event'); } - const signature = wm_trigger.http.headers['x-signature'] || wm_trigger.http.headers['signature']; + const signature = event.headers['x-signature'] || event.headers['signature']; if (!signature) { throw new Error('Missing signature in request headers.'); } - const timestamp = wm_trigger.http.headers['x-timestamp'] || wm_trigger.http.headers['timestamp']; + const timestamp = event.headers['x-timestamp'] || event.headers['timestamp']; if (timestamp) { const timestampValue = parseInt(timestamp, 10); const currentTime = Math.floor(Date.now() / 1000); diff --git a/docs/core_concepts/40_websocket_triggers/index.mdx b/docs/core_concepts/40_websocket_triggers/index.mdx index 1eec696a..47b3ee46 100644 --- a/docs/core_concepts/40_websocket_triggers/index.mdx +++ b/docs/core_concepts/40_websocket_triggers/index.mdx @@ -30,22 +30,24 @@ And if you use a [preprocessor](../43_preprocessors/index.mdx), the script could ```TypeScript export async function preprocessor( - msg: string, - wm_trigger: { + event: { kind: "websocket", - websocket: { - url: string // the WebSocket URL - } - }, + msg: string, + url: string, + } ) { + if (event.kind !== "websocket") { + throw new Error(`Expected a websocket event`); + } + // assuming the message is a JSON object - const msg = JSON.parse(msg); + const msg = JSON.parse(event.msg); // define args for the main function // let's assume we want to use the message content and the url return { message_content: msg.content, - url: wm_trigger.websocket.url + url: event.url }; } diff --git a/docs/core_concepts/41_kafka_triggers/index.mdx b/docs/core_concepts/41_kafka_triggers/index.mdx index 0cbea5b6..e3025ad7 100644 --- a/docs/core_concepts/41_kafka_triggers/index.mdx +++ b/docs/core_concepts/41_kafka_triggers/index.mdx @@ -15,12 +15,12 @@ The group id is automatically filled in from the current workspace and the trigg It indicates the consumer group to which the trigger belongs. This garantees that even if the trigger stops listening for a while, it will receive the messages it missed when it starts listening again. Once the Kafka resource and settings are set, select the runnable that should be triggered by this trigger. -The received webhook message will be passed to the runnable as a string argument called `msg`. +The received webhook base64 encoded payload will be passed to the runnable as a string argument called `payload`. Here's an example script: ```TypeScript -export async function main(msg: string) { +export async function main(payload: string) { // do something with the message } ``` @@ -29,24 +29,26 @@ And if you use a [preprocessor](../43_preprocessors/index.mdx), the script could ```TypeScript export async function preprocessor( - msg: string, - wm_trigger: { + event: { kind: "kafka", - kafka: { - brokers: string[]; - topic: string; // the specific topic the message was received from - group_id: string; - } - }, + payload: string, // base64 encoded payload + brokers: string[]; + topic: string; // the specific topic the message was received from + group_id: string; + } ) { + if (event.kind !== "kafka") { + throw new Error(`Expected a kafka event`); + } + // assuming the message is a JSON object - const msg = JSON.parse(msg); + const msg = JSON.parse(atob(event.payload)); // define args for the main function // let's assume we want to use the message content and the topic return { message_content: msg.content, - topic: wm_trigger.kafka.topic + topic: event.topic }; } diff --git a/docs/core_concepts/43_preprocessors/index.mdx b/docs/core_concepts/43_preprocessors/index.mdx index 621f98f1..1b0f162d 100644 --- a/docs/core_concepts/43_preprocessors/index.mdx +++ b/docs/core_concepts/43_preprocessors/index.mdx @@ -9,15 +9,10 @@ The preprocessor is only called when the runnable is triggered via a [webhook](. This approach is useful for preprocessing arguments differently depending on the trigger before the execution of the runnable. It also separates the handling of arguments according to whether they are called by a trigger or from the UI, which can help you keep a simple schema form in the UI for the runnable. -The preprocessor receives trigger metadata (`wm_trigger`) along with the main trigger arguments. -The structure of `wm_trigger` and the main trigger arguments are specific to each trigger type: +The preprocessor receives an `event` parameter, which contains all the main trigger data plus additional metadata. +The object always contain a `kind` field that contains the type of trigger. Other arguments are specific to the trigger type. -- Webhook/HTTP: `(wm_trigger: { kind: 'http' | 'webhook', http?: { ... } }, body_key_1: any, body_key_2: any, ...)` -- Postgres: `(wm_trigger: { kind: 'postgres' }, transaction_type: string, schema_name: string, table_name: string, row: any)` -- WebSocket/Kafka/NATS/SQS/MQTT/GCP: `(wm_trigger: { kind: 'websocket' | 'kafka' | 'nats' | 'sqs' | 'mqtt' | 'gcp', [kind]: { ... } }, msg: string)` -- Email: `(wm_trigger: { kind: 'email' }, raw_email: string, parsed_email: { ... })` - -You can find more details about the arguments format and the structure of `wm_trigger` for each trigger in their respective documentation pages. +You can find more details about the arguments format and the structure of `event` for each trigger kind in their respective documentation pages, or below in the templates. Preprocessors can only be written in [TypeScript](../../getting_started/0_scripts_quickstart/1_typescript_quickstart/index.mdx) (Bun/Deno) or [Python](../../getting_started/0_scripts_quickstart/2_python_quickstart/index.mdx). @@ -34,77 +29,103 @@ Here are some templates for scripts with preprocessors in [TypeScript](../../get ```TypeScript -export async function preprocessor( - /* - * Replace this comment with the parameters received from the trigger. - * Examples: `body_key_1`, `body_key_2` for Webhook/HTTP, `msg` for WebSocket, etc. - */ - - // The trigger metadata - wm_trigger: { - kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp', - http?: { - route: string // The route path, e.g. "/users/:id" - path: string // The actual path called, e.g. "/users/123" - method: string - params: Record // path parameters - query: Record // query parameters - headers: Record - }, - websocket?: { - url: string // The websocket url - }, - kafka?: { - brokers: string[] - topic: string - group_id: string - }, - nats?: { - servers: string[] - subject: string - headers?: Record - status?: number - description?: string - length: number - }, - sqs?: { - queue_url: string, - message_id?: string, - receipt_handle?: string, - attributes: Record, - message_attributes?: Record - }, - mqtt?: { - topic: string, - retain: boolean, - pkid: number, - qos: number, - v5?: { - payload_format_indicator?: number, - topic_alias?: number, - response_topic?: string, - correlation_data?: Array, - user_properties?: Array<[string, string]>, - subscription_identifiers?: Array, - content_type?: string +export async function preprocessor( + event: + | { + kind: "webhook"; + body: any, + raw_string: string | null, + query: Record; + headers: Record; + } + | { + kind: "http"; + body: any, + raw_string: string | null, + route: string; + path: string; + method: string; + params: Record; + query: Record; + headers: Record; + } + | { + kind: "email"; + parsed_email: any, + raw_email: string, + } + | { kind: "websocket"; msg: string; url: string } + | { + kind: "kafka"; + payload: string; + brokers: string[]; + topic: string; + group_id: string; + } + | { + kind: "nats"; + payload: string; + servers: string[]; + subject: string; + headers?: Record; + status?: number; + description?: string; + length: number; + } + | { + kind: "sqs"; + msg: string, + queue_url: string; + message_id?: string; + receipt_handle?: string; + attributes: Record; + message_attributes?: Record< + string, + { string_value?: string; data_type: string } + >; + } + | { + kind: "mqtt"; + payload: string, + topic: string; + retain: boolean; + pkid: number; + qos: number; + v5?: { + payload_format_indicator?: number; + topic_alias?: number; + response_topic?: string; + correlation_data?: Array; + user_properties?: Array<[string, string]>; + subscription_identifiers?: Array; + content_type?: string; + }; + } + | { + kind: "gcp"; + payload: string, + message_id: string; + subscription: string; + ordering_key?: string; + attributes?: Record; + delivery_type: "push" | "pull"; + headers?: Record; + publish_time?: string; } - }, - gcp?: { - message_id: string, - subscription: string, - ordering_key?: string, - attributes?: Record, - delivery_type: "push" | "pull", - headers?: Record, - publish_time?: string, - } - } ) { - return { - // return the args to be passed to the runnable + // you can use a switch statement to handle different trigger kinds + switch (event.kind) { + case "webhook": + return { + // return the args to be passed to the runnable + }; + case "http": + return { + // return the args to be passed to the runnable + }; + // ... + default: + throw new Error(`Unsupported trigger kind: ${event.kind}`); } } @@ -116,87 +137,132 @@ export async function main(/* main args */) { ```Python -from typing import TypedDict, Literal -class Http(TypedDict): - route: str # The route path, e.g. "/users/:id" - path: str # The actual path called, e.g. "/users/123" +from typing import TypedDict, Literal, Optional, Union + +class WebhookEvent(TypedDict): + kind: Literal["webhook"] + body: dict + raw_string: Optional[str] + query: dict[str, str] + headers: dict[str, str] + + +class HttpEvent(TypedDict): + kind: Literal["http"] + body: dict + raw_string: Optional[str] + route: str + path: str method: str params: dict[str, str] query: dict[str, str] headers: dict[str, str] -class Websocket(TypedDict): - url: str # The websocket url -class Kafka(TypedDict): - topic: str +class EmailEvent(TypedDict): + kind: Literal["email"] + parsed_email: dict + raw_email: str + + +class WebsocketEvent(TypedDict): + kind: Literal["websocket"] + msg: str + url: str + + +class KafkaEvent(TypedDict): + kind: Literal["kafka"] + payload: str brokers: list[str] + topic: str group_id: str -class Nats(TypedDict): + +class NatsEvent(TypedDict): + kind: Literal["nats"] + payload: str servers: list[str] subject: str - headers: dict[str, list[str]] | None - status: int | None - description: str | None + headers: Optional[dict[str, list[str]]] + status: Optional[int] + description: Optional[str] length: int + class MessageAttribute(TypedDict): - string_value: str | None + string_value: Optional[str] data_type: str -class Sqs(TypedDict): + +class SqsEvent(TypedDict): + kind: Literal["sqs"] + msg: str queue_url: str - message_id: str | None - receipt_handle: str | None + message_id: Optional[str] + receipt_handle: Optional[str] attributes: dict[str, str] - message_attributes: dict[str, MessageAttribute] | None - -class MqttV5Properties: - payload_format_indicator: int | None - topic_alias: int | None - response_topic: str | None - correlation_data: list[int] | None - user_properties: list[tuple[str, str]] | None - subscription_identifiers: list[int] | None - content_type: str | None - -class Mqtt(TypedDict): + message_attributes: Optional[dict[str, MessageAttribute]] + + +class MqttV5Properties(TypedDict, total=False): + payload_format_indicator: Optional[int] + topic_alias: Optional[int] + response_topic: Optional[str] + correlation_data: Optional[list[int]] + user_properties: Optional[list[tuple[str, str]]] + subscription_identifiers: Optional[list[int]] + content_type: Optional[str] + + +class MqttEvent(TypedDict): + kind: Literal["mqtt"] + payload: str topic: str retain: bool pkid: int qos: int - v5: MqttV5Properties | None + v5: Optional[MqttV5Properties] + -class Gcp(TypedDict): +class GcpEvent(TypedDict): + kind: Literal["gcp"] + payload: str message_id: str subscription: str - ordering_key: str | None - attributes: dict[str, str] | None + ordering_key: Optional[str] + attributes: Optional[dict[str, str]] delivery_type: Literal["push", "pull"] - headers: dict[str, str] | None - publish_time: str | None - -class WmTrigger(TypedDict): - kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats", "postgres", "sqs", "mqtt", "gcp"] - http: Http | None - websocket: Websocket | None - kafka: Kafka | None - nats: Nats | None - sqs: Sqs | None - mqtt: Mqtt | None - gcp: Gcp | None - -def preprocessor( - # Replace this comment with the parameters received from the trigger. - # Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc. - - # Trigger metadata - wm_trigger: WmTrigger, -): - return { - # return the args to be passed to the runnable - } + headers: Optional[dict[str, str]] + publish_time: Optional[str] + +Event = Union[ + WebhookEvent, + HttpEvent, + EmailEvent, + WebsocketEvent, + KafkaEvent, + NatsEvent, + SqsEvent, + MqttEvent, + GcpEvent, +] + + +def preprocessor(event: Event): + # you can use a switch statement to handle different trigger kinds + match event["kind"]: + case "webhook": + return { + # return the args to be passed to the runnable + } + case "http": + return { + # return the args to be passed to the runnable + } + # ... + case _: + raise ValueError(f"Unsupported trigger kind: {event['kind']}") def main(): # add the parameters you expect from the preprocessor # your code here @@ -224,77 +290,103 @@ Below you'll find preprocessor step templates for flows in [TypeScript](../../ge ```TypeScript -export async function preprocessor( - /* - * Replace this comment with the parameters received from the trigger. - * Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc. - */ - - // The trigger metadata - wm_trigger: { - kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt' | 'gcp', - http?: { - route: string // The route path, e.g. "/users/:id" - path: string // The actual path called, e.g. "/users/123" - method: string - params: Record // path parameters - query: Record // query parameters - headers: Record - }, - websocket?: { - url: string // The websocket url - }, - kafka?: { - brokers: string[] - topic: string - group_id: string - }, - nats?: { - servers: string[] - subject: string - headers?: Record - status?: number - description?: string - length: number - }, - sqs?: { - queue_url: string, - message_id?: string, - receipt_handle?: string, - attributes: Record, - message_attributes?: Record - }, - mqtt?: { - topic: string, - retain: boolean, - pkid: number, - qos: number, - v5?: { - payload_format_indicator?: number, - topic_alias?: number, - response_topic?: string, - correlation_data?: Array, - user_properties?: Array<[string, string]>, - subscription_identifiers?: Array, - content_type?: string +export async function preprocessor( + event: + | { + kind: "webhook"; + body: any, + raw_string: string | null, + query: Record; + headers: Record; + } + | { + kind: "http"; + body: any, + raw_string: string | null, + route: string; + path: string; + method: string; + params: Record; + query: Record; + headers: Record; + } + | { + kind: "email"; + parsed_email: any, + raw_email: string, + } + | { kind: "websocket"; msg: string; url: string } + | { + kind: "kafka"; + payload: string; + brokers: string[]; + topic: string; + group_id: string; + } + | { + kind: "nats"; + payload: string; + servers: string[]; + subject: string; + headers?: Record; + status?: number; + description?: string; + length: number; + } + | { + kind: "sqs"; + msg: string, + queue_url: string; + message_id?: string; + receipt_handle?: string; + attributes: Record; + message_attributes?: Record< + string, + { string_value?: string; data_type: string } + >; + } + | { + kind: "mqtt"; + payload: string, + topic: string; + retain: boolean; + pkid: number; + qos: number; + v5?: { + payload_format_indicator?: number; + topic_alias?: number; + response_topic?: string; + correlation_data?: Array; + user_properties?: Array<[string, string]>; + subscription_identifiers?: Array; + content_type?: string; + }; + } + | { + kind: "gcp"; + payload: string, + message_id: string; + subscription: string; + ordering_key?: string; + attributes?: Record; + delivery_type: "push" | "pull"; + headers?: Record; + publish_time?: string; } - }, - gcp?: { - message_id: string, - subscription: string, - ordering_key?: string, - attributes?: Record, - delivery_type: "push" | "pull", - headers?: Record, - publish_time?: string, - } - } ) { - return { - // return the args to be passed to the runnable + // you can use a switch statement to handle different trigger kinds + switch (event.kind) { + case "webhook": + return { + // return the args to be passed to the runnable + }; + case "http": + return { + // return the args to be passed to the runnable + }; + // ... + default: + throw new Error(`Unsupported trigger kind: ${event.kind}`); } } ``` @@ -302,87 +394,132 @@ export async function preprocessor( ```Python -from typing import TypedDict, Literal -class Http(TypedDict): - route: str # The route path, e.g. "/users/:id" - path: str # The actual path called, e.g. "/users/123" +from typing import TypedDict, Literal, Optional, Union + +class WebhookEvent(TypedDict): + kind: Literal["webhook"] + body: dict + raw_string: Optional[str] + query: dict[str, str] + headers: dict[str, str] + + +class HttpEvent(TypedDict): + kind: Literal["http"] + body: dict + raw_string: Optional[str] + route: str + path: str method: str params: dict[str, str] query: dict[str, str] headers: dict[str, str] -class Websocket(TypedDict): - url: str # The websocket url -class Kafka(TypedDict): - topic: str +class EmailEvent(TypedDict): + kind: Literal["email"] + parsed_email: dict + raw_email: str + + +class WebsocketEvent(TypedDict): + kind: Literal["websocket"] + msg: str + url: str + + +class KafkaEvent(TypedDict): + kind: Literal["kafka"] + payload: str brokers: list[str] + topic: str group_id: str -class Nats(TypedDict): + +class NatsEvent(TypedDict): + kind: Literal["nats"] + payload: str servers: list[str] subject: str - headers: dict[str, list[str]] | None - status: int | None - description: str | None + headers: Optional[dict[str, list[str]]] + status: Optional[int] + description: Optional[str] length: int + class MessageAttribute(TypedDict): - string_value: str | None + string_value: Optional[str] data_type: str -class Sqs(TypedDict): + +class SqsEvent(TypedDict): + kind: Literal["sqs"] + msg: str queue_url: str - message_id: str | None - receipt_handle: str | None + message_id: Optional[str] + receipt_handle: Optional[str] attributes: dict[str, str] - message_attributes: dict[str, MessageAttribute] | None - -class MqttV5Properties: - payload_format_indicator: int | None - topic_alias: int | None - response_topic: str | None - correlation_data: list[int] | None - user_properties: list[tuple[str, str]] | None - subscription_identifiers: list[int] | None - content_type: str | None - -class Mqtt(TypedDict): + message_attributes: Optional[dict[str, MessageAttribute]] + + +class MqttV5Properties(TypedDict, total=False): + payload_format_indicator: Optional[int] + topic_alias: Optional[int] + response_topic: Optional[str] + correlation_data: Optional[list[int]] + user_properties: Optional[list[tuple[str, str]]] + subscription_identifiers: Optional[list[int]] + content_type: Optional[str] + + +class MqttEvent(TypedDict): + kind: Literal["mqtt"] + payload: str topic: str retain: bool pkid: int qos: int - v5: MqttV5Properties | None + v5: Optional[MqttV5Properties] + -class Gcp(TypedDict): +class GcpEvent(TypedDict): + kind: Literal["gcp"] + payload: str message_id: str subscription: str - ordering_key: str | None - attributes: dict[str, str] | None + ordering_key: Optional[str] + attributes: Optional[dict[str, str]] delivery_type: Literal["push", "pull"] - headers: dict[str, str] | None - publish_time: str | None - -class WmTrigger(TypedDict): - kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats", "postgres", "sqs", "mqtt", "gcp"] - http: Http | None - websocket: Websocket | None - kafka: Kafka | None - nats: Nats | None - sqs: Sqs | None - mqtt: Mqtt | None - gcp: Gcp | None - -def preprocessor( - # Replace this comment with the parameters received from the trigger. - # Examples: `bodyKey1`, `bodyKey2` for Webhook/HTTP, `msg` for WebSocket, etc. - - # Trigger metadata - wm_trigger: WmTrigger, -): - return { - # return the args to be passed to the runnable - } + headers: Optional[dict[str, str]] + publish_time: Optional[str] + +Event = Union[ + WebhookEvent, + HttpEvent, + EmailEvent, + WebsocketEvent, + KafkaEvent, + NatsEvent, + SqsEvent, + MqttEvent, + GcpEvent, +] + + +def preprocessor(event: Event): + # you can use a switch statement to handle different trigger kinds + match event["kind"]: + case "webhook": + return { + # return the args to be passed to the runnable + } + case "http": + return { + # return the args to be passed to the runnable + } + # ... + case _: + raise ValueError(f"Unsupported trigger kind: {event['kind']}") ``` diff --git a/docs/core_concepts/45_nats_triggers/index.mdx b/docs/core_concepts/45_nats_triggers/index.mdx index b9a243f4..3d09434f 100644 --- a/docs/core_concepts/45_nats_triggers/index.mdx +++ b/docs/core_concepts/45_nats_triggers/index.mdx @@ -16,12 +16,12 @@ Specify the subjects the trigger should listen to (wildcards are supported). Only one subject is supported in core NATS. Once the NATS resource and settings are set, select the runnable that should be triggered by this trigger. -The received webhook message will be passed to the runnable as a string argument called `msg`. +The received webhook base64 encoded payload will be passed to the runnable as a string argument called `payload`. Here's an example script: ```TypeScript -export async function main(msg: string) { +export async function main(payload: string) { // do something with the message } ``` @@ -30,27 +30,29 @@ And if you use a [preprocessor](../43_preprocessors/index.mdx), the script could ```TypeScript export async function preprocessor( - msg: string, - wm_trigger: { + event: { kind: "nats", - nats: { - servers: string[]; - subject: string; // the specific subject the message was received from - length: number; - headers?: Record; - status?: number; - description?: string; - } - }, + payload: string, // base64 encoded payload + servers: string[]; + subject: string; // the specific subject the message was received from + length: number; + headers?: Record; + status?: number; + description?: string; + } ) { + if (event.kind !== "nats") { + throw new Error(`Expected a nats event`); + } + // assuming the message is a JSON object - const msg = JSON.parse(msg); + const msg = JSON.parse(atob(event.payload)); // define args for the main function // let's assume we want to use the message content and the subject return { message_content: msg.content, - subject: wm_trigger.nats.subject + subject: event.subject }; } diff --git a/docs/core_concepts/48_sqs_triggers/index.mdx b/docs/core_concepts/48_sqs_triggers/index.mdx index 02de00a0..cbf3aea3 100644 --- a/docs/core_concepts/48_sqs_triggers/index.mdx +++ b/docs/core_concepts/48_sqs_triggers/index.mdx @@ -68,36 +68,38 @@ If you use a [preprocessor](../43_preprocessors/index.mdx), the preprocessor fun ```TypeScript export async function preprocessor( + event: { + kind: "sqs", msg: string, - wm_trigger: { - kind: "sqs", - sqs: { - queue_url: string, - message_id?: string, - receipt_handle?: string, - attributes: Record, - message_attributes?: Record - } - }, + queue_url: string, + message_id?: string, + receipt_handle?: string, + attributes: Record, + message_attributes?: Record + } ) { - // assuming the message is a JSON object - const data = JSON.parse(msg); - - return { - content: data.content, - metadata: { - sentAt: wm_trigger.sqs.attributes.SentTimestamp, - messageId: wm_trigger.sqs.message_id - } - }; + if (event.kind !== "sqs") { + throw new Error(`Expected a SQS event`); + } + + // assuming the message is a JSON object + const data = JSON.parse(event.msg); + + return { + content: data.content, + metadata: { + sentAt: event.attributes.SentTimestamp, + messageId: event.message_id + } + }; } export async function main(content: string, metadata: { sentAt: string, messageId: string }) { - // Process transformed message data - console.log(`Processing message ${metadata.messageId} sent at ${metadata.sentAt}`); - console.log("Content:", content); + // Process transformed message data + console.log(`Processing message ${metadata.messageId} sent at ${metadata.sentAt}`); + console.log("Content:", content); } ``` \ No newline at end of file diff --git a/docs/core_concepts/49_mqtt_triggers/index.mdx b/docs/core_concepts/49_mqtt_triggers/index.mdx index 42a4ba03..60761d98 100644 --- a/docs/core_concepts/49_mqtt_triggers/index.mdx +++ b/docs/core_concepts/49_mqtt_triggers/index.mdx @@ -64,9 +64,9 @@ Below are code examples demonstrating how to handle MQTT messages in your Windmi ### Basic script ```typescript -export async function main(payload: Array) { - // Convert binary payload to string - const textPayload = new TextDecoder().decode(new Uint8Array(payload)); +export async function main(payload: string) { + // Convert base64 encoded payload to string + const textPayload = atob(payload); // Parse JSON if applicable try { @@ -85,7 +85,7 @@ export async function main(payload: Array) { ### Script with preprocessor -If you use a [preprocessor](../43_preprocessors/index.mdx), the preprocessor function receives the message payload as byte array and an MQTT object with the following fields: +If you use a [preprocessor](../43_preprocessors/index.mdx), the preprocessor function receives the message payload as base64 encoded string and an MQTT object with the following fields: #### MQTT object @@ -124,44 +124,41 @@ For more information about MQTT v5 properties, see the [MQTT v5 Properties Docum * - `{a: 1, b: 2}` → `main(a, b)` * - `{payload}` → `main(payload)` * - * @param wm_trigger - Trigger details (e.g., MQTT, HTTP) - * @param payload - Raw trigger data (format varies by trigger type) + * @param event - Trigger data and metadata (e.g., MQTT, HTTP) * @returns Processed data for `main()` */ export async function preprocessor( - wm_trigger: { - kind: 'gcp', - mqtt?: { - topic: string, - retain: boolean, - pkid: number, - qos: number, - v5?: { - payload_format_indicator?: number, - topic_alias?: number, - response_topic?: string, - correlation_data?: Array, - user_properties?: Array<[string, string]>, - subscription_identifiers?: Array, - content_type?: string - } + event: { + kind: "mqtt", + payload: string, // base64 encoded payload + topic: string, + retain: boolean, + pkid: number, + qos: number, + v5?: { + payload_format_indicator?: number, + topic_alias?: number, + response_topic?: string, + correlation_data?: Array, + user_properties?: Array<[string, string]>, + subscription_identifiers?: Array, + content_type?: string } - }, - payload: Array, + + } ) { - if (wm_trigger.kind === 'mqtt' && wm_trigger.mqtt) { - const uint8Payload = new Uint8Array(payload); - const payloadAsString = new TextDecoder().decode(uint8Payload); + if (event.kind === 'mqtt') { + const payloadAsString = atob(event.payload); return { - contentType: wm_trigger.mqtt.v5?.content_type, + contentType: event.v5?.content_type, payload: uint8Payload, payloadAsString }; } // We assume the script is triggered by an MQTT message, which is why an error is thrown for other trigger kinds. // If the script is intended to support other triggers, update this logic to handle the respective trigger kind. - throw new Error(`Expected mqtt trigger kind got: ${wm_trigger.kind}`) + throw new Error(`Expected mqtt trigger kind got: ${event.kind}`) } /** diff --git a/docs/core_concepts/50_gcp_triggers/index.mdx b/docs/core_concepts/50_gcp_triggers/index.mdx index 162c2210..2fea22f7 100644 --- a/docs/core_concepts/50_gcp_triggers/index.mdx +++ b/docs/core_concepts/50_gcp_triggers/index.mdx @@ -112,24 +112,22 @@ Example preprocessor: ```typescript export async function preprocessor( - wm_trigger: { + event: { kind: 'gcp', - gcp?: { - message_id: string, - subscription: string, - ordering_key?: string, - attributes?: Record, - delivery_type: "push" | "pull", - headers?: Record, - publish_time?: string, - } - }, - payload: string, + payload: string, // base64 encoded payload + message_id: string, + subscription: string, + ordering_key?: string, + attributes?: Record, + delivery_type: "push" | "pull", + headers?: Record, + publish_time?: string, + } ) { - if (wm_trigger.kind === 'gcp' && wm_trigger.gcp) { - const decodedString = new TextDecoder().decode(Uint8Array.from(atob(payload), c => c.charCodeAt(0))); + if (event.kind === 'gcp') { + const decodedString = atob(event.payload); - const attributes = wm_trigger.gcp.attributes || {}; + const attributes = event.attributes || {}; const contentType = attributes['content-type'] || attributes['Content-Type']; const isJson = contentType === 'application/json'; @@ -150,7 +148,7 @@ export async function preprocessor( }; } - throw new Error(`Expected gcp trigger kind got: ${wm_trigger.kind}`); + throw new Error(`Expected gcp trigger kind got: ${event.kind}`); } ```