-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Nexus APIs to the Temporal Go SDK #89
Merged
+369
−0
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -237,38 +237,10 @@ SDK's selectors. | |
|
||
> NOTE: In the future, as the Go SDK adds support for typed futures, we will add a strongly typed variant of this API. | ||
|
||
```go | ||
package temporalnexus | ||
|
||
type Endpoint interface { | ||
// contains filtered or unexported methods | ||
} | ||
|
||
// NewNamedEndpoint creates an with targeted by name as registered in the endpoint registry. If the endpoint is renamed | ||
// calls refering to the old name will fail. | ||
func NewNamedEndpoint(name string) Endpoint | ||
|
||
// START: Potential future | ||
|
||
// NewEndpointFromID creates an endpoint targeted by a stable endpoint ID from the endpoint registry. | ||
func NewEndpointFromID(id string) Endpoint | ||
|
||
// NewEndpointFromSelectors creates an endpoint that is resolved by applying the given selectors. | ||
// For example {"env" "prod", "region": "us-east-1"}. | ||
func NewEndpointFromSelectors(selectors map[string]string) Endpoint | ||
|
||
// END: Potential future | ||
``` | ||
|
||
```go | ||
// NexusOperationOptions are options for starting a Nexus Operation from a Workflow. | ||
type NexusOperationOptions struct { | ||
ScheduleToCloseTimeout time.Duration | ||
// The endpoint to send this operation request to. Optional. If not provided the endpoint will be resolved in | ||
// the worker based on the given worker options NexusEndpointResolver function. | ||
// It is generally recommended to keep workflow code enviroment agnostic and only set this from a workflow in | ||
// ambiguous cases. | ||
Endpoint temporalnexus.Endpoint | ||
} | ||
|
||
// NexusOperationExecution is the result of [NexusOperationFuture.GetNexusOperationExecution]. | ||
|
@@ -288,9 +260,15 @@ type NexusOperationFuture interface { | |
GetNexusOperationExecution() Future | ||
} | ||
|
||
// ExecuteNexusOperation executes a Nexus Operation. | ||
// The operation argument can be a string, a [nexus.Operation] or a [nexus.OperationReference]. | ||
func ExecuteNexusOperation(ctx Context, service string, operation any, input any, options NexusOperationOptions) NexusOperationFuture | ||
// NexusClient is a client for executing Nexus Operations from a workflow. | ||
type NexusClient interface { | ||
// ExecuteOperation executes a Nexus Operation. | ||
// The operation argument can be a string, a [nexus.Operation] or a [nexus.OperationReference]. | ||
ExecuteOperation(ctx Context, operation any, input any, options NexusOperationOptions) NexusOperationFuture | ||
} | ||
|
||
// Create a [NexusClient] from a service name and an endpoint name. | ||
func NewNexusClient(service, endpoint string) NexusClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also considering adding context and an options struct to future proof this API. |
||
``` | ||
|
||
**Usage**: | ||
|
@@ -301,15 +279,12 @@ import ( | |
"time" | ||
|
||
"go.temporal.io/sdk/workflow" | ||
"go.temporal.io/sdk/temporalnexus" | ||
) | ||
|
||
func MyCallerWorkflow(ctx workflow.Context) (MyOutput, error) { | ||
fut := workflow.ExecuteNexusOperation(ctx, "payments", "start-transaction", MyInput{ID: "tx-deadbeef"}, workflow.NexusOperationOptions{ | ||
client := workflow.NewNexusClient("payments", "prod-payments") | ||
fut := client.ExecuteOperation(ctx, "start-transaction", MyInput{ID: "tx-deadbeef"}, workflow.NexusOperationOptions{ | ||
ScheduleToCloseTimeout: time.Hour, | ||
// Optional, if not provided the endpoint will be resolved in the worker based on the given worker | ||
// options. | ||
Endpoint: temporalnexus.NewNamedEndpoint("prod-payments"), | ||
}) | ||
var exec workflow.NexusOperationExecution | ||
_ = fut.GetNexusOperationExecution().Get(ctx, &exec) | ||
|
@@ -321,30 +296,14 @@ func MyCallerWorkflow(ctx workflow.Context) (MyOutput, error) { | |
|
||
> NOTE: To cancel a Nexus Operation, cancel the context used to execute it. | ||
|
||
### Endpoint Registration in the Worker | ||
|
||
To avoid injecting configuration into the workflow - which is inherently unsafe and could break determinism - the SDK | ||
provides a worker option to resolve a service to an endpoint. | ||
|
||
> NOTE: This could also be implemented in an interceptor but for consistency with all SDKs, including ones where | ||
> interceptors run in the sandboxed workflow environment, this functionality has been extracted out to the worker. | ||
|
||
```go | ||
opts := worker.Options{ | ||
NexusEndpointResolver: func(ctx workflow.Context, service string) temporalnexus.Endpoint { | ||
return temporalnexus.NewNamedEndpoint("prod-" + service) | ||
} | ||
} | ||
``` | ||
|
||
### Interceptors | ||
|
||
For now we'll only intercept outbound APIs from a workflow, extending the `WorkflowOutboundInterceptor` interface. | ||
More interceptors are likely to come later. | ||
|
||
```go | ||
type WorkflowOutboundInterceptor interface { | ||
ScheduleNexusOperation(ctx Context, service, operation string, input any, options NexusOperationOptions) NexusOperationFuture | ||
NewNexusClient(service, endpoint string) workflow.NexusClient | ||
RequestCancelNexusOperation(ctx Context, service, operation, id string, options nexus.CancelOperationOptions) error | ||
} | ||
``` | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider reversing the order to endpoint, service.