Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Feb 5, 2024
2 parents 9c2af5f + 703c93c commit e717811
Show file tree
Hide file tree
Showing 31 changed files with 1,706 additions and 255 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/typescript-sdk-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ jobs:
run: pnpm install
working-directory: typescript-sdk

- name: Build SDK
run: pnpm build
working-directory: typescript-sdk

- name: Publish SDK
run: pnpm publish --access public
- name: Build and Publish SDK
run: pnpm publish:ci
working-directory: typescript-sdk
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
12 changes: 6 additions & 6 deletions frontend/docs/pages/home/go-sdk/_meta.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"setup": "Setup",
"creating-a-worker": "Creating a Worker",
"creating-a-workflow": "Creating a Workflow",
"pushing-events": "Pushing Events",
"scheduling-workflows": "Scheduling Workflows"
}
"setup": "Setup",
"creating-a-workflow": "Creating a Workflow",
"creating-a-worker": "Creating a Worker",
"pushing-events": "Pushing Events",
"scheduling-workflows": "Scheduling Workflows"
}
10 changes: 5 additions & 5 deletions frontend/docs/pages/home/python-sdk/_meta.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"setup": "Setup",
"creating-a-worker": "Creating a Worker",
"creating-a-workflow": "Creating a Workflow",
"pushing-events": "Pushing Events"
}
"setup": "Setup",
"creating-a-workflow": "Creating a Workflow",
"creating-a-worker": "Creating a Worker",
"pushing-events": "Pushing Events"
}
10 changes: 5 additions & 5 deletions frontend/docs/pages/home/typescript-sdk/_meta.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"setup": "Setup",
"creating-a-worker": "Creating a Worker",
"creating-a-workflow": "Creating a Workflow",
"pushing-events": "Pushing Events"
}
"setup": "Setup",
"creating-a-workflow": "Creating a Workflow",
"creating-a-worker": "Creating a Worker",
"pushing-events": "Pushing Events"
}
27 changes: 9 additions & 18 deletions frontend/docs/pages/home/typescript-sdk/creating-a-worker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,21 @@ It will automatically read in any `HATCHET_CLIENT` environment variables, which
For example:

```ts
import Hatchet from "@hatchet/sdk";
import { Workflow } from "@hatchet/sdk";
import Hatchet, { Workflow } from "@hatchet-dev/typescript-sdk";

const hatchet = Hatchet.init();

// workflow code...

hatchet.run(workflow);
```

## Advanced Usage

The `hatchet.run(workflow)` method will return a reference to the `Worker` object.

You could then stop the worker:
async function main() {
const worker = await hatchet.worker("example-worker");
await worker.registerWorkflow(workflow);
worker.start();
}

```ts
const worker = hatchet.run(workflow);
worker.stop();
main();
```

Alternatively, you can create a worker object with `hatchet.worker()` and manually start the service.
## Options

```ts
const worker = hatchet.worker(workflow);
worker.start();
```
The `hatchet.worker()` method takes a simple name parameter which can be used to identify the worker on the Hatchet dashboard.
186 changes: 154 additions & 32 deletions frontend/docs/pages/home/typescript-sdk/creating-a-workflow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ To create a workflow, simply create a new `Workflow` object.
For example, a simple 2-step workflow would look like:

```ts
import Hatchet from "@hatchet/sdk";
import { Workflow } from "@hatchet/workflow";
import Hatchet, { Workflow } from "@hatchet-dev/typescript-sdk";

const hatchet = Hatchet.init();

Expand All @@ -18,15 +17,15 @@ const workflow: Workflow = {
steps: [
{
name: "step1",
run: (input, ctx) => {
run: (ctx) => {
console.log("executed step1!");
return { step1: "step1" };
},
},
{
name: "step2",
parents: ["step1"]
run: (input, ctx) => {
run: (ctx) => {
console.log("executed step2!");
return { step2: "step2" };
},
Expand All @@ -35,37 +34,40 @@ const workflow: Workflow = {
};
```

You'll notice that the workflow defines a workflow trigger (in this case, `on_events`), and the workflow definition. The workflow definition includes a series of steps which is simply an array of `Step` objects. Each step has a `run` prop which is a function that takes an `input` and a `context`. The `context` argument is a `Context` object, which contains information about the workflow, such as the input data and the output data of previous steps.
You'll notice that the workflow defines a workflow trigger (in this case, `on_events`), and the workflow definition. The workflow definition includes a series of steps which is simply an array of `Step` objects.
Each step has a `run` prop which is a function that with a `context` augment. The `context` argument is a `Context` object, which contains information about the workflow, such as the input data and the output data of previous steps.

To create multi-step workflows, you can use `parents` to define the steps which the current step depends on. In the example, `step2` will not invoke until after `step1` completes.

## Getting Access to the Input Data
## Getting Access to the Workflow Input Data

You can get access to the workflow's input data simply by accessing the `input` parameter. You can . For example, given the following event:

```json
{
"name": "John"
}
```
You can get access to the workflow's input data simply by calling `ctx.workflowInput()`.

Here's an example `Step` which accesses the workflow input:

```ts
const stepPrintsInput: Step = {
name: "step2",
parents: ["step1"],
run: (input, ctx) => {
console.log("executed step2!", input["name"]);
run: (ctx) => {
console.log("executed step2!", ctx.workflowInput["name"]);
},
};
```

Given the following event:

```json
{
"name": "John"
}
```

The console will log:

````
```
executed step2! John
``
```

## Step Outputs

Expand All @@ -74,46 +76,166 @@ Step outputs should be a of type `Record<string, any>`, should be `JSON` seriali
```ts
const stepReturnsData: Step = {
name: "step2",
run: (input, ctx) => {
run: (ctx) => {
return { awesome: "data" };
},
};
````
```

Future steps can access this output through the context (`ctx`) parameter `ctx.stepOutput("<step_name>")`. In this example, a future step could access this data via `context.stepOutput("step2")`:

```ts
const futureStep: Step = {
name: "step3",
run: (ctx) => {
const uppercaseStep2 = ctx.stepOutput("step2")["awesome"].toUpperCase();
return { uppercase: uppercaseStep2 };
},
};
```

Future steps can access this output through the context (`ctx`) parameter `ctx.stepOutput("<step_name>")`. In this example, a future step could access this data via `context.stepOutput("step2")`. Remember, a step that depends on previous step data should include this dependency in the `parents` array.
Remember, a step that depends on previous step data should include this dependency in the `parents` array.

## Cron Schedules

You can declare a cron schedule by defining `on_crons` in the `Workflow` object. For example, to trigger a workflow every 5 minutes, you can do the following:

```go
import Hatchet from '@hatchet/sdk';
import { Workflow } from '@hatchet/workflow';
```ts
import Hatchet from "@hatchet-dev/typescript-sdk";
import { Workflow } from "@hatchet/workflow";

const hatchet = Hatchet.init();

const workflow: Workflow = {
id: 'example',
description: 'test',
id: "example",
description: "test",
on: {
cron: "*/5 * * * *",
},
steps: [
{
name: 'step1',
name: "step1",
run: (input, ctx) => {
console.log('executed step1!');
return { step1: 'step1' };
console.log("executed step1!");
return { step1: "step1" };
},
},
{
name: 'step2',
parents: ['step1'],
name: "step2",
parents: ["step1"],
run: (input, ctx) => {
console.log('executed step2!', input);
return { step2: 'step2' };
console.log("executed step2!", input);
return { step2: "step2" };
},
},
],
};
```

## Concurrency Limits and Fairness

> \***\*Note:** this feature is currently in beta, and currently only supports a concurrency strategy which terminates the oldest running workflow run to make room for the new one. This will be expanded in the future to support other strategies.\*\*
By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by adding a `concurrency` configuration to your workflow declaration. This configuration includes a key which takes a function that returns a **concurrency group key**, which is a string that is used to group concurrent executions. **Note that this function should not also be used as a `hatchet.step`.** For example, the following workflow will only allow 5 concurrent executions for any workflow execution of `ConcurrencyDemoWorkflow`, since the key is statically set to `concurrency-key`:

```ts
const workflow: Workflow = {
id: "concurrency-example",
description: "test",
on: {
event: "concurrency:create",
},
concurrency: {
name: "basic-concurrency",
key: (ctx) => "concurrency-key",
},
steps: [
{
name: "step1",
run: async (ctx) => {
const { data } = ctx.workflowInput();
const { signal } = ctx.controller;

if (signal.aborted) throw new Error("step1 was aborted");

console.log("starting step1 and waiting 5 seconds...", data);
await sleep(5000);

if (signal.aborted) throw new Error("step1 was aborted");

// NOTE: the AbortController signal can be passed to many http libraries to cancel active requests
// fetch(url, { signal })
// axios.get(url, { signal })

console.log("executed step1!");
return { step1: `step1 results for ${data}!` };
},
},
{
name: "step2",
parents: ["step1"],
run: (ctx) => {
console.log(
"executed step2 after step1 returned ",
ctx.stepOutput("step1")
);
return { step2: "step2 results!" };
},
},
],
};
```

### Cancellation Signalling

When a concurrent workflow is already running, Hatchet will send a cancellation signal to the step via it's context. For now, you must handle this signal to exit the step at a logical point:

```ts
{
name: "step1",
run: async (ctx) => {
const { data } = ctx.workflowInput();
const { signal } = ctx.controller;

if (signal.aborted) throw new Error("step1 was aborted");

console.log("starting step1 and waiting 5 seconds...", data);
await sleep(5000);

if (signal.aborted) throw new Error("step1 was aborted");

// NOTE: the AbortController signal can be passed to many http libraries to cancel active requests
// fetch(url, { signal })
// axios.get(url, { signal })

console.log("executed step1!");
return { step1: `step1 results for ${data}!` };
},
},
```

### Use-Case: Enforcing Per-User Concurrency Limits

You can use the custom concurrency function to enforce per-user concurrency limits. For example, the following workflow will only allow 1 concurrent execution per user:

```py
const workflow: Workflow = {
id: "concurrency-example",
description: "test",
on: {
event: "concurrency:create",
},
concurrency: {
name: "basic-concurrency",
maxRuns: 1,
key: (ctx) => ctx.workflowInput().userId,
},
// Rest of the workflow configuration
}
```

This same approach can be used for:

- Setting concurrency for a specific user session by `session_id` (i.e. multiple chat messages sent)
- Limiting data or document ingestion by setting an input hash or on-file key.
- Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.
2 changes: 1 addition & 1 deletion frontend/docs/pages/home/typescript-sdk/pushing-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Events can be pushed via the client's `hatchet.event.push()` method:

```ts
import Hatchet from "@hatchet/sdk";
import Hatchet from "@hatchet-dev/typescript-sdk";

const hatchet = Hatchet.init();

Expand Down
28 changes: 28 additions & 0 deletions typescript-sdk/examples/concurrency/concurrency-event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import Hatchet from '../../src/sdk';

const hatchet = Hatchet.init();

const sleep = (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});

async function main() {
hatchet.event.push('concurrency:create', {
data: 'event 1',
userId: 'user1',
});

// step 1 will wait 5000 ms,
// so sending a second event
// before that will cancel
// the first run and run the second event
await sleep(1000);

hatchet.event.push('concurrency:create', {
data: 'event 2',
userId: 'user1',
});
}

main();
Loading

0 comments on commit e717811

Please sign in to comment.