Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutines #5

Merged
merged 75 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
cc71c81
Bump dispatch-proto
chriso Jun 14, 2024
ea2bece
Use typed coroutine state
chriso Jun 14, 2024
982a39e
Factor out helpers for use in the next layer up
chriso Jun 14, 2024
3f36dcc
Integrate coroutine library, and get simple functions working
chriso Jun 14, 2024
fccfb36
Implement suspend/resume
chriso Jun 14, 2024
4c8b57a
No need for manual noinline directive
chriso Jun 14, 2024
e1510bf
coroc struggles with nested NewFunction call; avoid it for now
chriso Jun 14, 2024
4f0cfef
Setup serde for proto.Message
chriso Jun 14, 2024
92df977
Add custom serializers for Dispatch/Client
chriso Jun 17, 2024
255c1af
Ignore warning about copy of mutex
chriso Jun 17, 2024
8c2c65c
Fix Dispatch deser
chriso Jun 17, 2024
e593364
Need to store type URL alongside serialized proto.Message
chriso Jun 17, 2024
975e2ba
Tidy up the coroutine after response is generated
chriso Jun 17, 2024
9613050
Factor out a component to hold a collection of functions
chriso Jun 18, 2024
443ab9d
Rework internals in prep for test runner
chriso Jun 18, 2024
141f30e
Disallow registration of volatile coroutines for now
chriso Jun 18, 2024
0285ca0
Sketch out volatile execution mode
chriso Jun 18, 2024
78bba6b
Helper for constructing exit directives with an output value
chriso Jun 18, 2024
7f863e8
Test exit directive
chriso Jun 18, 2024
4d69531
Simplify
chriso Jun 18, 2024
0223560
Improve internal docs
chriso Jun 18, 2024
4d24e28
Remove unused interfaces
chriso Jun 18, 2024
089f2cc
Fix adding coroutine state to a response
chriso Jun 18, 2024
d3b6e7c
Status is optional when constructing responses
chriso Jun 18, 2024
bfadd59
Input/Output option funcs are optional; infer from how Any is used
chriso Jun 18, 2024
f39c38f
Clarify what's happening when creating response
chriso Jun 18, 2024
f82221a
Test poll
chriso Jun 18, 2024
162ab6b
Avoid range over int; no need to exclude older Go versions
chriso Jun 18, 2024
8562188
Remove id.go
chriso Jun 18, 2024
5e0d68e
Continue to make it easier working with proto wrappers
chriso Jun 18, 2024
4fa5ba4
Build await/gather
chriso Jun 18, 2024
578946b
Add more comments to help readers
chriso Jun 19, 2024
dd14f9d
Implement and test gather
chriso Jun 19, 2024
5050c9b
No need for pointer receiver on gRPC handler
chriso Jun 19, 2024
60e069c
Test random delivery of call results
chriso Jun 19, 2024
9cd38f8
Extract a Gather[O] helper
chriso Jun 19, 2024
e39b14c
Implement dispatchtest.Run
chriso Jun 19, 2024
5db758e
Write an integration test
chriso Jun 19, 2024
c8eb42f
Change make command name so it's easier to read
chriso Jun 19, 2024
6fb2e5d
This needs to be a coroutine
chriso Jun 19, 2024
ebbe731
Bump all deps
chriso Jun 19, 2024
dc4be01
Run the integration test in volatile mode first
chriso Jun 19, 2024
bc44ad4
Merge branch 'main' into coroutine
chriso Jun 20, 2024
f4b242f
Add a couple more helpers
chriso Jun 20, 2024
9b9a092
Move the runnable interface
chriso Jun 20, 2024
8f002a3
Downgrade the panic to a warning
chriso Jun 20, 2024
6632b20
Merge GenericFunction and GenericCoroutine
chriso Jun 20, 2024
43ae3e3
Make volatile coroutines work a little better
chriso Jun 20, 2024
aa787bb
Use shorter function constructors
chriso Jun 20, 2024
ebd1de9
Factor out a dispatchproto package
chriso Jun 20, 2024
100daba
Factor out a dispatchcoro package
chriso Jun 20, 2024
6b35737
Tidy up coroutine impl
chriso Jun 20, 2024
6008b2a
Remove the volatile coroutine warning
chriso Jun 20, 2024
e33f8f4
Tweak dispatchtest API to avoid dispatchproto
chriso Jun 20, 2024
750ec98
No need to export default API URL
chriso Jun 20, 2024
c12f841
Return an error if the response status isn't OK
chriso Jun 20, 2024
5210c6b
Update coroutine.go
chriso Jun 20, 2024
4061aa6
Update coroutine.go
chriso Jun 20, 2024
5c0638d
Drop the type aliases for ID/Call
chriso Jun 20, 2024
6e268d0
NewCall => BuildCall
chriso Jun 20, 2024
7639b4a
Remove the need to close PrimitiveFunction/Registry
chriso Jun 21, 2024
8722d65
Rework exports
chriso Jun 21, 2024
d91dd0c
Remove the need to close functions
chriso Jun 21, 2024
7ad2705
Closing an endpoint closes the registry and any registered coroutines
chriso Jun 21, 2024
aeddcee
Move registry
chriso Jun 21, 2024
2c29d74
Factor out a dispatchclient package
chriso Jun 21, 2024
ba06fc3
Don't have Function depend on PrimitiveFunction
chriso Jun 21, 2024
a4dba56
Simplify how primitive functions are registered
chriso Jun 21, 2024
d77d264
Simplify the dispatchtest package
chriso Jun 21, 2024
54d4fbf
Simplify Option interface
chriso Jun 21, 2024
1a2d935
Fix docs
chriso Jun 21, 2024
d49cb23
Isolate the set of volatile coroutine instances
chriso Jun 21, 2024
d9e5ecc
Simplify further
chriso Jun 21, 2024
7750571
Move more internal logic to dispatchcoro
chriso Jun 21, 2024
0010d75
PR feedback
chriso Jun 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*_durable.go
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: fmt lint test
.PHONY: clean coroc fmt lint test integration-test clean coroc

fmt:
go fmt ./...
Expand All @@ -8,3 +8,16 @@ lint:

test:
go test ./...

integration-test: clean coroc
go run ./dispatchtest/integration # volatile mode
coroc ./dispatchtest/integration
go run -tags durable ./dispatchtest/integration # durable mode

clean:
find . -name '*_durable.go' -delete

coroc:
@which coroc &>/dev/null \
|| echo "Installing coroc..." \
&& go install github.com/dispatchrun/coroutine/compiler/cmd/coroc@latest
127 changes: 69 additions & 58 deletions dispatch.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build !durable

package dispatch

import (
Expand All @@ -10,12 +12,16 @@ import (
"os"
"strings"
"sync"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go/dispatchclient"
"github.com/dispatchrun/dispatch-go/dispatchproto"
"github.com/dispatchrun/dispatch-go/internal/auth"
"github.com/dispatchrun/dispatch-go/internal/env"
)

// Dispatch is a Dispatch endpoint.
Expand All @@ -24,31 +30,33 @@ type Dispatch struct {
verificationKey string
serveAddr string
env []string
opts []Option

client *Client
client *dispatchclient.Client
clientErr error

path string
handler http.Handler

functions map[string]Function
functions dispatchproto.FunctionMap
mu sync.Mutex
}

// New creates a Dispatch endpoint.
func New(opts ...DispatchOption) (*Dispatch, error) {
func New(opts ...Option) (*Dispatch, error) {
d := &Dispatch{
env: os.Environ(),
functions: map[string]Function{},
opts: opts,
functions: map[string]dispatchproto.Function{},
}
for _, opt := range opts {
opt.configureDispatch(d)
opt(d)
}

// Prepare the endpoint URL.
var endpointUrlFromEnv bool
if d.endpointUrl == "" {
d.endpointUrl = getenv(d.env, "DISPATCH_ENDPOINT_URL")
d.endpointUrl = env.Get(d.env, "DISPATCH_ENDPOINT_URL")
endpointUrlFromEnv = true
}
if d.endpointUrl == "" {
Expand All @@ -64,7 +72,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) {

// Prepare the address to serve on.
if d.serveAddr == "" {
d.serveAddr = getenv(d.env, "DISPATCH_ENDPOINT_ADDR")
d.serveAddr = env.Get(d.env, "DISPATCH_ENDPOINT_ADDR")
if d.serveAddr == "" {
d.serveAddr = "127.0.0.1:8000"
}
Expand All @@ -73,7 +81,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) {
// Prepare the verification key.
var verificationKeyFromEnv bool
if d.verificationKey == "" {
d.verificationKey = getenv(d.env, "DISPATCH_VERIFICATION_KEY")
d.verificationKey = env.Get(d.env, "DISPATCH_VERIFICATION_KEY")
verificationKeyFromEnv = true
}
var verificationKey ed25519.PublicKey
Expand All @@ -93,8 +101,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) {
if err != nil {
return nil, err
}
grpcHandler := &dispatchFunctionServiceHandler{d}
d.path, d.handler = sdkv1connect.NewFunctionServiceHandler(grpcHandler, connect.WithInterceptors(validator))
d.path, d.handler = sdkv1connect.NewFunctionServiceHandler(dispatchHandler{d}, connect.WithInterceptors(validator))

// Setup request signature validation.
if verificationKey == nil {
Expand All @@ -109,29 +116,21 @@ func New(opts ...DispatchOption) (*Dispatch, error) {

// Optionally attach a client.
if d.client == nil {
d.client, d.clientErr = NewClient(Env(d.env...))
d.client, d.clientErr = dispatchclient.New(dispatchclient.Env(d.env...))
}

return d, nil
}

// DispatchOption configures a Dispatch endpoint.
type DispatchOption interface {
configureDispatch(d *Dispatch)
}

type dispatchOptionFunc func(d *Dispatch)

func (fn dispatchOptionFunc) configureDispatch(d *Dispatch) {
fn(d)
}
// Option configures a Dispatch endpoint.
type Option func(*Dispatch)

// EndpointUrl sets the URL of the Dispatch endpoint.
//
// It defaults to the value of the DISPATCH_ENDPOINT_URL environment
// variable.
func EndpointUrl(endpointUrl string) DispatchOption {
return dispatchOptionFunc(func(d *Dispatch) { d.endpointUrl = endpointUrl })
func EndpointUrl(endpointUrl string) Option {
return func(d *Dispatch) { d.endpointUrl = endpointUrl }
}

// VerificationKey sets the verification key to use when verifying
Expand All @@ -144,8 +143,8 @@ func EndpointUrl(endpointUrl string) DispatchOption {
//
// If a verification key is not provided, request signatures will
// not be validated.
func VerificationKey(verificationKey string) DispatchOption {
return dispatchOptionFunc(func(d *Dispatch) { d.verificationKey = verificationKey })
func VerificationKey(verificationKey string) Option {
return func(d *Dispatch) { d.verificationKey = verificationKey }
}

// ServeAddress sets the address that the Dispatch endpoint
Expand All @@ -157,28 +156,41 @@ func VerificationKey(verificationKey string) DispatchOption {
// It defaults to the value of the DISPATCH_ENDPOINT_ADDR environment
// variable, which is automatically set by the Dispatch CLI. If this
// is unset, it defaults to 127.0.0.1:8000.
func ServeAddress(addr string) DispatchOption {
return dispatchOptionFunc(func(d *Dispatch) { d.serveAddr = addr })
func ServeAddress(addr string) Option {
return func(d *Dispatch) { d.serveAddr = addr }
}

// Register registers a function.
func (d *Dispatch) Register(fn Function) {
d.mu.Lock()
defer d.mu.Unlock()
// Env sets the environment variables that a Dispatch endpoint
// parses its default configuration from.
//
// It defaults to os.Environ().
func Env(env ...string) Option {
return func(d *Dispatch) { d.env = env }
}

d.functions[fn.Name()] = fn
// Client sets the client to use when dispatching calls
// from functions registered on the endpoint.
//
// By default the Dispatch endpoint will attempt to construct
// a dispatchclient.Client instance using the DISPATCH_API_KEY
// and optional DISPATCH_API_URL environment variables. If more
// control is required over client configuration, the custom
// client instance can be registered here and used instead.
func Client(client *dispatchclient.Client) Option {
return func(d *Dispatch) { d.client = client }
}

// Bind the function to this endpoint, so that the function's
// NewCall and Dispatch methods can be used to build and
// dispatch calls.
fn.bind(d)
// Register registers a function.
func (d *Dispatch) Register(fn AnyFunction) {
d.RegisterPrimitive(fn.Register(d))
}

func (d *Dispatch) lookupFunction(name string) Function {
// RegisterPrimitive registers a primitive function.
func (d *Dispatch) RegisterPrimitive(name string, fn dispatchproto.Function) {
d.mu.Lock()
defer d.mu.Unlock()

return d.functions[name]
d.functions[name] = fn
}

// URL is the URL of the Dispatch endpoint.
Expand All @@ -193,29 +205,12 @@ func (d *Dispatch) Handler() (string, http.Handler) {
}

// Client returns the Client attached to this endpoint.
func (d *Dispatch) Client() (*Client, error) {
func (d *Dispatch) Client() (*dispatchclient.Client, error) {
return d.client, d.clientErr
}

// The gRPC handler is unexported so that the http.Handler can
// be wrapped in order to validate request signatures.
type dispatchFunctionServiceHandler struct {
dispatch *Dispatch
}

func (d *dispatchFunctionServiceHandler) Run(ctx context.Context, req *connect.Request[sdkv1.RunRequest]) (*connect.Response[sdkv1.RunResponse], error) {
var res Response
fn := d.dispatch.lookupFunction(req.Msg.Function)
if fn == nil {
res = NewResponseErrorf("%w: function %q not found", ErrNotFound, req.Msg.Function)
} else {
res = fn.Run(ctx, Request{req.Msg})
}
return connect.NewResponse(res.proto), nil
}

// Serve serves the Dispatch endpoint.
func (d *Dispatch) Serve() error {
// ListenAndServe serves the Dispatch endpoint.
func (d *Dispatch) ListenAndServe() error {
mux := http.NewServeMux()
mux.Handle(d.Handler())

Expand All @@ -224,3 +219,19 @@ func (d *Dispatch) Serve() error {
server := &http.Server{Addr: d.serveAddr, Handler: mux}
return server.ListenAndServe()
}

// The gRPC handler is deliberately unexported. This forces
// the user to access it through Dispatch.Handler, and get
// a handler that has signature verification middleware attached.
type dispatchHandler struct{ dispatch *Dispatch }

func (d dispatchHandler) Run(ctx context.Context, req *connect.Request[sdkv1.RunRequest]) (*connect.Response[sdkv1.RunResponse], error) {
res := d.dispatch.functions.Run(ctx, newProtoRequest(req.Msg))
return connect.NewResponse(responseProto(res)), nil
}

//go:linkname newProtoRequest github.com/dispatchrun/dispatch-go/dispatchproto.newProtoRequest
func newProtoRequest(r *sdkv1.RunRequest) dispatchproto.Request

//go:linkname responseProto github.com/dispatchrun/dispatch-go/dispatchproto.responseProto
func responseProto(r dispatchproto.Response) *sdkv1.RunResponse
Loading
Loading