Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutines #5

Merged
merged 75 commits into from
Jun 23, 2024
Merged

Coroutines #5

merged 75 commits into from
Jun 23, 2024

Conversation

chriso
Copy link
Contributor

@chriso chriso commented Jun 14, 2024

Following on from #2, this PR integrates https://github.com/dispatchrun/coroutine.

Dispatch functions are now coroutines. They're identical in terms of their signature and usage (accepting I any and returning O any), the difference is that they're able to yield control to Dispatch during execution.

Yielding to Dispatch

A new helper function is available for yielding control to Dispatch:

Yield(Response) Request

This Yield function can be called anywhere in the call stack of a coroutine.

The SDK automatically suspends coroutines at these yield points, returning the Response to Dispatch. When Dispatch responds with a new Request, the SDK takes care of resuming execution from the point the coroutine was suspended at.

Await & Gather

Users probably won't want/need to use the Yield function directly, and may instead prefer one of the higher-level helpers.

An Await function is provided for awaiting the results of one or more calls:

Await(strategy=AwaitAll|AwaitAny, calls ...Call) ([]CallResult, error)

Internally, Await submits calls to Dispatch and then continuously polls until results are available. The strategy controls how the function handles failure (should it wait for all results but return as soon as any call fails? or return as soon as any call succeeds? this is analogous to Promise.all vs. Promise.any from JavaScript).

Higher-level methods have also been added to the function/coroutine instances, allowing users to asynchronously make calls and await their result(s). These helper methods have input and output types that match the function/coroutine:

stringify := dispatch.Func("stringify", func(ctx context.Context, n int) (string, error) {
	return strconv.Itoa(n), nil
})

// The following functions are now available on the function instance:
stringify.Await(int, ...CallOption) (string, error)
stringify.Gather([]int, ...CallOption) ([]string, error)

Here's an example showing how functions can be composed using the new helpers:

stringify := dispatch.Func("stringify", func(ctx context.Context, n int) (string, error) {
	return strconv.Itoa(n), nil
})

double := dispatch.Func("double", func(ctx context.Context, n int) (int, error) {
	return n * 2, nil
})

doubleAndRepeat := dispatch.Func("double-repeat", func(ctx context.Context, n int) (string, error) {
	doubled, err := double.Await(n)
	if err != nil {
		return "", err
	}
	stringified, err := stringify.Await(doubled)
	if err != nil {
		return "", err
	}
	return strings.Repeat(stringified, doubled), nil
})

Durable vs. Volatile Coroutines

Coroutines provided by https://github.com/dispatchrun/coroutine can be run in two modes: volatile mode, where coroutines are suspended in memory using a channel, and durable mode where the coroutines are unwound and can be serialized. Durable coroutines require an extra compilation step: see the coroc compiler from https://github.com/dispatchrun/coroutine?tab=readme-ov-file#durable-coroutines.

In durable mode, the SDK takes care of serializing a coroutine and sending its serialized state back to Dispatch. In this mode, the coroutine can be resumed in another location. In volatile mode, the SDK instead sends a reference to a suspended coroutine back to Dispatch. The coroutine cannot be resumed in another location. Volatile coroutines are still useful when unit testing coroutines. A dispatchtest.Run helper has been provided for running a function/coroutine (and any nested functions/coroutines) entirely in memory.

@chriso
Copy link
Contributor Author

chriso commented Jun 20, 2024

@achille-roussel I merged the function/coroutine implementations into one Coroutine[I, O] impl, constructed via dispatch.Func(...).

I split out two packages:

  1. a dispatchproto package that contains the protobuf struct wrappers, and the constructors and options we use to build them
  2. a dispatchcoro package containing the lower level coroutine helpers (Yield, Await, Serialize, etc.)

The top-level exports are looking better now:

$ go doc .
package dispatch // import "github.com/dispatchrun/dispatch-go"

var ErrTimeout error = dispatchproto.StatusError(dispatchproto.TimeoutStatus) ...
func Env(env ...string) interface{ ... }
func New(opts ...DispatchOption) (*Dispatch, error)
func NewClient(opts ...ClientOption) (*Client, error)
type Batch struct{ ... }
type Call = dispatchproto.Call
type Client struct{ ... }
type ClientOption interface{ ... }
    func APIKey(apiKey string) ClientOption
    func APIUrl(apiUrl string) ClientOption
type Coroutine[I, O any] struct{ ... }
    func Func[I, O any](name string, fn func(context.Context, I) (O, error)) *Coroutine[I, O]
type Dispatch struct{ ... }
type DispatchOption interface{ ... }
    func EndpointUrl(endpointUrl string) DispatchOption
    func ServeAddress(addr string) DispatchOption
    func VerificationKey(verificationKey string) DispatchOption
type Function interface{ ... }
type ID = dispatchproto.ID
type PrimitiveFunction struct{ ... }
    func PrimitiveFunc(name string, ...) *PrimitiveFunction
type Registry struct{ ... }

Users shouldn't have to interact with the two new packages much (if at all). They can create functions and then use/compose them using the provided methods on Coroutine[I, O] (e.g. see the PR description).

If they want more control, or need to do something lower level (like a tail call), or want to do something where it's hard for us to provide a generic API (e.g. awaiting the results of calls with different input/output types) then they can use the packages directly.

It's difficult to break down the dispatchproto package further because of how coupled everything is there (the builder pattern we're using means that the types implement many other {OtherType}Option interfaces).

We could possibly split out a status package (e.g. status.OK vs. dispatchproto.OKStatus), and an object package (e.g. object.Int(...) object.Any vs. dispatchproto.Int(...) dispatchproto.Any, but again the user shouldn't be interacting with statuses and boxed inputs/outputs in most cases so I'm not sure it's worth burning two generic words. We also have Status or Any implement the option interfaces of the other proto wrappers, so would need extra indirection when building other structs that embed one of these types.

I toyed with using the name dispatchwire instead of dispatchproto since it lines up better with dispatchtest and dispatchcoro. I might change it still.

Let me know what you think.

@chriso chriso requested a review from achille-roussel June 20, 2024 03:11
client.go Outdated Show resolved Hide resolved
coroutine.go Outdated Show resolved Hide resolved
coroutine.go Outdated
Comment on lines 202 to 216
// Close closes the coroutine.
//
// In volatile mode, Close destroys all running instances of the coroutine.
// In durable mode, Close is a noop.
func (c *Coroutine[I, O]) Close() error {
c.mu.Lock()
defer c.mu.Unlock()

for _, fn := range c.instances {
fn.Stop()
fn.Next()
}
clear(c.instances)
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this useful outside of tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, yes. There are no restrictions on whether volatile or durable coroutines can be registered with a Dispatch endpoint. You might want to test volatile coroutines with the Dispatch CLI, for example, in which case the tear down is still applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if I can rework some of this.

coroutine.go Outdated Show resolved Hide resolved
coroutine.go Outdated Show resolved Hide resolved
chriso and others added 13 commits June 21, 2024 09:34
Co-authored-by: Achille <[email protected]>
Co-authored-by: Achille <[email protected]>
Rename Coroutine[I,O] to Function[I,O] and keep
the concept of coroutines an implementation detail.

Rename the old Function interface to AnyFunction,
and hide the exported methods from users.

Remove the ability to execute a function via Run,
instead forcing users to use dispatchtest.Run(..).
@chriso
Copy link
Contributor Author

chriso commented Jun 21, 2024

@achille-roussel I had another round of simplifications:

  • I factored out a dispatchclient package. Most users won't need to configure a client manually; there's no need to pollute the top-level namespace. It also means we can drop the prefix/suffix on some types/functions, e.g. dispatch.DispatchOption => dispatch.Option and dispatch.ClientOption => dispatchclient.Option and dispatch.NewClient => dispatchclient.New
  • I renamed the Function interface to AnyFunction, and removed the Run method and internal bind hook in favor of a single Register(*Dispatch) (name, dispatchproto.Function) method
  • I renamed Coroutine[I, O] => Function[I, O] now that the Function name was free. Coroutines are an implementation detail; there's no need to confuse the user with the term
  • I removed the top-level PrimitiveFunction. It's still possible to register primitive functions, via a new Dispatch.RegisterPrimitive function. You lose the convenience methods (e.g. BuildCall/Dispatch/Await), however it's one less thing to wrap your head around when looking at the top-level namespace. That means that AnyFunction is now the base interface for all Function[I, O] instances (only).
  • I removed the function registry we were using to route requests by name to a set of functions. Instead, the various components that need to do routing (dispatchlambda.Handler, dispatchtest.Run, Dispatch endpoints) all internally construct and use a lower-level dispatchproto.FunctionMap. All the user has to do is provide one or more functions (...AnyFunction) to these components

Here's what we have:

package dispatch // import "github.com/dispatchrun/dispatch-go"

var ErrTimeout error = dispatchproto.StatusError(dispatchproto.TimeoutStatus) ...
func New(opts ...Option) (*Dispatch, error)
type AnyFunction interface{ ... }
type Dispatch struct{ ... }
type Function[I, O any] struct{ ... }
    func Func[I, O any](name string, fn func(context.Context, I) (O, error)) *Function[I, O]
type Option func(d *Dispatch)
    func Client(client *dispatchclient.Client) Option
    func EndpointUrl(endpointUrl string) Option
    func Env(env ...string) Option
    func ServeAddress(addr string) Option
    func VerificationKey(verificationKey string) Option
func (d *Dispatch) Client() (*dispatchclient.Client, error)
func (d *Dispatch) Handler() (string, http.Handler)
func (d *Dispatch) Register(fn AnyFunction)
func (d *Dispatch) RegisterPrimitive(name string, fn dispatchproto.Function)
func (d *Dispatch) Serve() error
func (d *Dispatch) URL() string
func Func[I, O any](name string, fn func(context.Context, I) (O, error)) *Function[I, O]

func (f *Function[I, O]) Name() string
func (f *Function[I, O]) BuildCall(input I, opts ...dispatchproto.CallOption) (dispatchproto.Call, error)
func (f *Function[I, O]) Dispatch(ctx context.Context, input I, opts ...dispatchproto.CallOption) (dispatchproto.ID, error)
func (f *Function[I, O]) Await(input I, opts ...dispatchproto.CallOption) (O, error)
func (c *Function[I, O]) Gather(inputs []I, opts ...dispatchproto.CallOption) ([]O, error)
func (f *Function[I, O]) Register(endpoint *Dispatch) (string, dispatchproto.Function)

type AnyFunction interface { Register(*Dispatch) (string, dispatchproto.Function) }
package dispatchclient // import "github.com/dispatchrun/dispatch-go/dispatchclient"

type Batch struct{ ... }
type Client struct{ ... }
    func New(opts ...Option) (*Client, error)
type Option func(*Client)
    func APIKey(apiKey string) Option
    func APIUrl(apiUrl string) Option
    func Env(env ...string) Option

Let me know what you think.

@chriso chriso requested a review from achille-roussel June 21, 2024 03:24
Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how lean the API has gotten in the latest changes, nice work 👏

dispatch.go Outdated Show resolved Hide resolved
dispatchproto/proto.go Outdated Show resolved Hide resolved
@chriso chriso merged commit 44a121b into main Jun 23, 2024
2 checks passed
@chriso chriso deleted the coroutine branch June 23, 2024 21:19
@chriso chriso self-assigned this Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants