Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Feb 1, 2024
2 parents c93f727 + d6c14e1 commit 03b53bb
Show file tree
Hide file tree
Showing 119 changed files with 4,869 additions and 960 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ jobs:
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: "1.21"

- uses: actions/cache@v3
with:
Expand Down
61 changes: 49 additions & 12 deletions api-contracts/dispatcher/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ service Dispatcher {

rpc Listen(WorkerListenRequest) returns (stream AssignedAction) {}

rpc SendActionEvent(ActionEvent) returns (ActionEventResponse) {}
rpc SendStepActionEvent(StepActionEvent) returns (ActionEventResponse) {}

rpc SendGroupKeyActionEvent(GroupKeyActionEvent) returns (ActionEventResponse) {}

rpc Unsubscribe(WorkerUnsubscribeRequest) returns (WorkerUnsubscribeResponse) {}
}
Expand Down Expand Up @@ -39,35 +41,42 @@ message WorkerRegisterResponse {
enum ActionType {
START_STEP_RUN = 0;
CANCEL_STEP_RUN = 1;
START_GET_GROUP_KEY = 2;
}

message AssignedAction {
// the tenant id
string tenantId = 1;

// the workflow run id (optional)
string workflowRunId = 2;

// the get group key run id (optional)
string getGroupKeyRunId = 3;

// the job id
string jobId = 2;
string jobId = 4;

// the job name
string jobName = 3;
string jobName = 5;

// the job run id
string jobRunId = 4;
string jobRunId = 6;

// the step id
string stepId = 5;
string stepId = 7;

// the step run id
string stepRunId = 6;
string stepRunId = 8;

// the action id
string actionId = 7;
string actionId = 9;

// the action type
ActionType actionType = 8;
ActionType actionType = 10;

// the action payload
string actionPayload = 9;
string actionPayload = 11;
}

message WorkerListenRequest {
Expand All @@ -88,14 +97,42 @@ message WorkerUnsubscribeResponse {
string workerId = 2;
}

enum ActionEventType {
enum GroupKeyActionEventType {
GROUP_KEY_EVENT_TYPE_UNKNOWN = 0;
GROUP_KEY_EVENT_TYPE_STARTED = 1;
GROUP_KEY_EVENT_TYPE_COMPLETED = 2;
GROUP_KEY_EVENT_TYPE_FAILED = 3;
}

message GroupKeyActionEvent {
// the id of the worker
string workerId = 1;

// the id of the job
string workflowRunId = 2;

string getGroupKeyRunId = 3;

// the action id
string actionId = 4;

google.protobuf.Timestamp eventTimestamp = 5;

// the step event type
GroupKeyActionEventType eventType = 6;

// the event payload
string eventPayload = 7;
}

enum StepActionEventType {
STEP_EVENT_TYPE_UNKNOWN = 0;
STEP_EVENT_TYPE_STARTED = 1;
STEP_EVENT_TYPE_COMPLETED = 2;
STEP_EVENT_TYPE_FAILED = 3;
}

message ActionEvent {
message StepActionEvent {
// the id of the worker
string workerId = 1;

Expand All @@ -117,7 +154,7 @@ message ActionEvent {
google.protobuf.Timestamp eventTimestamp = 7;

// the step event type
ActionEventType eventType = 8;
StepActionEventType eventType = 8;

// the event payload
string eventPayload = 9;
Expand Down
14 changes: 14 additions & 0 deletions api-contracts/workflows/workflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ service WorkflowService {
rpc GetWorkflowByName(GetWorkflowByNameRequest) returns (Workflow);
rpc ListWorkflowsForEvent(ListWorkflowsForEventRequest) returns (ListWorkflowsResponse);
rpc DeleteWorkflow(DeleteWorkflowRequest) returns (Workflow);

}

message PutWorkflowRequest {
Expand All @@ -28,6 +29,19 @@ message CreateWorkflowVersionOpts {
repeated string cron_triggers = 5; // (optional) cron triggers for the workflow
repeated google.protobuf.Timestamp scheduled_triggers = 6; // (optional) scheduled triggers for the workflow
repeated CreateWorkflowJobOpts jobs = 7; // (required) the workflow jobs
WorkflowConcurrencyOpts concurrency = 8; // (optional) the workflow concurrency options
}

enum ConcurrencyLimitStrategy {
CANCEL_IN_PROGRESS = 0;
DROP_NEWEST = 1;
QUEUE_NEWEST = 2;
}

message WorkflowConcurrencyOpts {
string action = 1; // (required) the action id for getting the concurrency group
int32 max_runs = 2; // (optional) the maximum number of concurrent workflow runs, default 1
ConcurrencyLimitStrategy limit_strategy = 3; // (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
}

// CreateWorkflowJobOpts represents options to create a workflow job.
Expand Down
23 changes: 19 additions & 4 deletions cmd/hatchet-admin/cli/keyset.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,26 @@ func runCreateCloudKMSJWTKeyset() error {
return err
}

fmt.Println("Private EC256 Keyset:")
fmt.Println(string(privateEc256))
if encryptionKeyDir != "" {
// we write these as .key files so that they're gitignored by default
err = os.WriteFile(encryptionKeyDir+"/private_ec256.key", privateEc256, 0600)

if err != nil {
return err
}

fmt.Println("Public EC256 Keyset:")
fmt.Println(string(publicEc256))
err = os.WriteFile(encryptionKeyDir+"/public_ec256.key", publicEc256, 0600)

if err != nil {
return err
}
} else {
fmt.Println("Private EC256 Keyset:")
fmt.Println(string(privateEc256))

fmt.Println("Public EC256 Keyset:")
fmt.Println(string(publicEc256))
}

return nil
}
3 changes: 3 additions & 0 deletions cmd/hatchet-admin/cli/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func seedDev(repo repository.Repository, tenantId string) error {
Name: "Preview",
},
},
Concurrency: &repository.CreateWorkflowConcurrencyOpts{
Action: "test:concurrency",
},
Jobs: []repository.CreateWorkflowJobOpts{
{
Name: "job-name",
Expand Down
43 changes: 33 additions & 10 deletions cmd/hatchet-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (

"github.com/hatchet-dev/hatchet/internal/config/loader"
"github.com/hatchet-dev/hatchet/internal/services/admin"
"github.com/hatchet-dev/hatchet/internal/services/controllers/events"
"github.com/hatchet-dev/hatchet/internal/services/controllers/jobs"
"github.com/hatchet-dev/hatchet/internal/services/controllers/workflows"
"github.com/hatchet-dev/hatchet/internal/services/dispatcher"
"github.com/hatchet-dev/hatchet/internal/services/eventscontroller"
"github.com/hatchet-dev/hatchet/internal/services/grpc"
"github.com/hatchet-dev/hatchet/internal/services/heartbeat"
"github.com/hatchet-dev/hatchet/internal/services/ingestor"
"github.com/hatchet-dev/hatchet/internal/services/jobscontroller"
"github.com/hatchet-dev/hatchet/internal/services/ticker"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
Expand Down Expand Up @@ -171,10 +172,10 @@ func startEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) {
if sc.HasService("eventscontroller") {
// create separate events controller process
go func() {
ec, err := eventscontroller.New(
eventscontroller.WithTaskQueue(sc.TaskQueue),
eventscontroller.WithRepository(sc.Repository),
eventscontroller.WithLogger(sc.Logger),
ec, err := events.New(
events.WithTaskQueue(sc.TaskQueue),
events.WithRepository(sc.Repository),
events.WithLogger(sc.Logger),
)

if err != nil {
Expand All @@ -193,10 +194,32 @@ func startEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) {
if sc.HasService("jobscontroller") {
// create separate jobs controller process
go func() {
jc, err := jobscontroller.New(
jobscontroller.WithTaskQueue(sc.TaskQueue),
jobscontroller.WithRepository(sc.Repository),
jobscontroller.WithLogger(sc.Logger),
jc, err := jobs.New(
jobs.WithTaskQueue(sc.TaskQueue),
jobs.WithRepository(sc.Repository),
jobs.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = jc.Start(ctx)

if err != nil {
errCh <- err
}
}()
}

if sc.HasService("workflowscontroller") {
// create separate jobs controller process
go func() {
jc, err := workflows.New(
workflows.WithTaskQueue(sc.TaskQueue),
workflows.WithRepository(sc.Repository),
workflows.WithLogger(sc.Logger),
)

if err != nil {
Expand Down
Loading

0 comments on commit 03b53bb

Please sign in to comment.