diff --git a/dispatch.go b/dispatch.go index 75448ee..c90c463 100644 --- a/dispatch.go +++ b/dispatch.go @@ -17,8 +17,10 @@ import ( sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1" "connectrpc.com/connect" "connectrpc.com/validate" + "github.com/dispatchrun/dispatch-go/dispatchclient" "github.com/dispatchrun/dispatch-go/dispatchproto" "github.com/dispatchrun/dispatch-go/internal/auth" + "github.com/dispatchrun/dispatch-go/internal/env" ) // Dispatch is a Dispatch endpoint. @@ -27,9 +29,9 @@ type Dispatch struct { verificationKey string serveAddr string env []string - opts []DispatchOption + opts []Option - client *Client + client *dispatchclient.Client clientErr error path string @@ -39,19 +41,19 @@ type Dispatch struct { } // New creates a Dispatch endpoint. -func New(opts ...DispatchOption) (*Dispatch, error) { +func New(opts ...Option) (*Dispatch, error) { d := &Dispatch{ env: os.Environ(), opts: opts, } for _, opt := range opts { - opt.configureDispatch(d) + opt(d) } // Prepare the endpoint URL. var endpointUrlFromEnv bool if d.endpointUrl == "" { - d.endpointUrl = getenv(d.env, "DISPATCH_ENDPOINT_URL") + d.endpointUrl = env.Get(d.env, "DISPATCH_ENDPOINT_URL") endpointUrlFromEnv = true } if d.endpointUrl == "" { @@ -67,7 +69,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) { // Prepare the address to serve on. if d.serveAddr == "" { - d.serveAddr = getenv(d.env, "DISPATCH_ENDPOINT_ADDR") + d.serveAddr = env.Get(d.env, "DISPATCH_ENDPOINT_ADDR") if d.serveAddr == "" { d.serveAddr = "127.0.0.1:8000" } @@ -76,7 +78,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) { // Prepare the verification key. var verificationKeyFromEnv bool if d.verificationKey == "" { - d.verificationKey = getenv(d.env, "DISPATCH_VERIFICATION_KEY") + d.verificationKey = env.Get(d.env, "DISPATCH_VERIFICATION_KEY") verificationKeyFromEnv = true } var verificationKey ed25519.PublicKey @@ -111,29 +113,21 @@ func New(opts ...DispatchOption) (*Dispatch, error) { // Optionally attach a client. if d.client == nil { - d.client, d.clientErr = NewClient(Env(d.env...)) + d.client, d.clientErr = dispatchclient.New(dispatchclient.Env(d.env...)) } return d, nil } -// DispatchOption configures a Dispatch endpoint. -type DispatchOption interface { - configureDispatch(d *Dispatch) -} - -type dispatchOptionFunc func(d *Dispatch) - -func (fn dispatchOptionFunc) configureDispatch(d *Dispatch) { - fn(d) -} +// Option configures a Dispatch endpoint. +type Option func(d *Dispatch) // EndpointUrl sets the URL of the Dispatch endpoint. // // It defaults to the value of the DISPATCH_ENDPOINT_URL environment // variable. -func EndpointUrl(endpointUrl string) DispatchOption { - return dispatchOptionFunc(func(d *Dispatch) { d.endpointUrl = endpointUrl }) +func EndpointUrl(endpointUrl string) Option { + return func(d *Dispatch) { d.endpointUrl = endpointUrl } } // VerificationKey sets the verification key to use when verifying @@ -146,8 +140,8 @@ func EndpointUrl(endpointUrl string) DispatchOption { // // If a verification key is not provided, request signatures will // not be validated. -func VerificationKey(verificationKey string) DispatchOption { - return dispatchOptionFunc(func(d *Dispatch) { d.verificationKey = verificationKey }) +func VerificationKey(verificationKey string) Option { + return func(d *Dispatch) { d.verificationKey = verificationKey } } // ServeAddress sets the address that the Dispatch endpoint @@ -159,8 +153,28 @@ func VerificationKey(verificationKey string) DispatchOption { // It defaults to the value of the DISPATCH_ENDPOINT_ADDR environment // variable, which is automatically set by the Dispatch CLI. If this // is unset, it defaults to 127.0.0.1:8000. -func ServeAddress(addr string) DispatchOption { - return dispatchOptionFunc(func(d *Dispatch) { d.serveAddr = addr }) +func ServeAddress(addr string) Option { + return func(d *Dispatch) { d.serveAddr = addr } +} + +// Env sets the environment variables that a Dispatch endpoint +// parses its default configuration from. +// +// It defaults to os.Environ(). +func Env(env ...string) Option { + return func(d *Dispatch) { d.env = env } +} + +// Client sets the client to use when dispatching calls +// from functions registered on the endpoint. +// +// By default the Dispatch endpoint will attempt to construct +// a dispatchclient.Client instance using the DISPATCH_API_KEY +// and optional DISPATCH_API_URL environment variables. If more +// control is required over client configuration, the custom +// client instance can be registered here and used instead. +func Client(client *dispatchclient.Client) Option { + return func(d *Dispatch) { d.client = client } } // Register registers a function. @@ -185,7 +199,7 @@ func (d *Dispatch) Handler() (string, http.Handler) { } // Client returns the Client attached to this endpoint. -func (d *Dispatch) Client() (*Client, error) { +func (d *Dispatch) Client() (*dispatchclient.Client, error) { return d.client, d.clientErr } diff --git a/dispatch_test.go b/dispatch_test.go index 9eb755b..a7ce532 100644 --- a/dispatch_test.go +++ b/dispatch_test.go @@ -8,6 +8,7 @@ import ( "connectrpc.com/connect" "github.com/dispatchrun/dispatch-go" + "github.com/dispatchrun/dispatch-go/dispatchclient" "github.com/dispatchrun/dispatch-go/dispatchproto" "github.com/dispatchrun/dispatch-go/dispatchtest" ) @@ -76,12 +77,12 @@ func TestDispatchCall(t *testing.T) { recorder := &dispatchtest.CallRecorder{} server := dispatchtest.NewServer(recorder) - client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL)) + client, err := dispatchclient.New(dispatchclient.APIKey("foobar"), dispatchclient.APIUrl(server.URL)) if err != nil { t.Fatal(err) } - endpoint, err := dispatch.New(dispatch.EndpointUrl("http://example.com"), client) + endpoint, err := dispatch.New(dispatch.EndpointUrl("http://example.com"), dispatch.Client(client)) if err != nil { t.Fatal(err) } @@ -144,12 +145,12 @@ func TestDispatchCallsBatch(t *testing.T) { server := dispatchtest.NewServer(&recorder) - client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL)) + client, err := dispatchclient.New(dispatchclient.APIKey("foobar"), dispatchclient.APIUrl(server.URL)) if err != nil { t.Fatal(err) } - endpoint, err := dispatch.New(dispatch.EndpointUrl("http://example.com"), client) + endpoint, err := dispatch.New(dispatch.EndpointUrl("http://example.com"), dispatch.Client(client)) if err != nil { t.Fatal(err) } diff --git a/client.go b/dispatchclient/client.go similarity index 85% rename from client.go rename to dispatchclient/client.go index a7a6a63..d9112d9 100644 --- a/client.go +++ b/dispatchclient/client.go @@ -1,6 +1,4 @@ -//go:build !durable - -package dispatch +package dispatchclient import ( "context" @@ -14,6 +12,7 @@ import ( "connectrpc.com/connect" "connectrpc.com/validate" "github.com/dispatchrun/dispatch-go/dispatchproto" + "github.com/dispatchrun/dispatch-go/internal/env" ) const defaultApiUrl = "https://api.dispatch.run" @@ -27,23 +26,23 @@ type Client struct { apiUrl string env []string httpClient *http.Client - opts []ClientOption + opts []Option client sdkv1connect.DispatchServiceClient } -// NewClient creates a Client. -func NewClient(opts ...ClientOption) (*Client, error) { +// New creates a Client. +func New(opts ...Option) (*Client, error) { c := &Client{ env: os.Environ(), opts: opts, } for _, opt := range opts { - opt.configureClient(c) + opt(c) } if c.apiKey == "" { - c.apiKey = getenv(c.env, "DISPATCH_API_KEY") + c.apiKey = env.Get(c.env, "DISPATCH_API_KEY") c.apiKeyFromEnv = true } if c.apiKey == "" { @@ -51,7 +50,7 @@ func NewClient(opts ...ClientOption) (*Client, error) { } if c.apiUrl == "" { - c.apiUrl = getenv(c.env, "DISPATCH_API_URL") + c.apiUrl = env.Get(c.env, "DISPATCH_API_URL") } if c.apiUrl == "" { c.apiUrl = defaultApiUrl @@ -81,22 +80,14 @@ func NewClient(opts ...ClientOption) (*Client, error) { } // ClientOption configures a Client. -type ClientOption interface { - configureClient(d *Client) -} - -type clientOptionFunc func(d *Client) - -func (fn clientOptionFunc) configureClient(d *Client) { - fn(d) -} +type Option func(*Client) // APIKey sets the Dispatch API key to use for authentication when // dispatching function calls through a Client. // // It defaults to the value of the DISPATCH_API_KEY environment variable. -func APIKey(apiKey string) ClientOption { - return clientOptionFunc(func(c *Client) { c.apiKey = apiKey }) +func APIKey(apiKey string) Option { + return func(c *Client) { c.apiKey = apiKey } } // APIUrl sets the URL of the Dispatch API. @@ -104,8 +95,16 @@ func APIKey(apiKey string) ClientOption { // It defaults to the value of the DISPATCH_API_URL environment variable, // or the default API URL (https://api.dispatch.run) if DISPATCH_API_URL // is unset. -func APIUrl(apiUrl string) ClientOption { - return clientOptionFunc(func(c *Client) { c.apiUrl = apiUrl }) +func APIUrl(apiUrl string) Option { + return func(c *Client) { c.apiUrl = apiUrl } +} + +// Env sets the environment variables that a Client parses its +// default configuration from. +// +// It defaults to os.Environ(). +func Env(env ...string) Option { + return func(c *Client) { c.env = env } } // Dispatch dispatches a function call. @@ -119,10 +118,6 @@ func (c *Client) Dispatch(ctx context.Context, call dispatchproto.Call) (dispatc return ids[0], nil } -func (c *Client) configureDispatch(d *Dispatch) { - d.client = c -} - // Batch creates a Batch. func (c *Client) Batch() Batch { return Batch{client: c} diff --git a/client_test.go b/dispatchclient/client_test.go similarity index 86% rename from client_test.go rename to dispatchclient/client_test.go index 15e2ac8..f025f26 100644 --- a/client_test.go +++ b/dispatchclient/client_test.go @@ -1,11 +1,11 @@ -package dispatch_test +package dispatchclient_test import ( "context" "net/http" "testing" - "github.com/dispatchrun/dispatch-go" + "github.com/dispatchrun/dispatch-go/dispatchclient" "github.com/dispatchrun/dispatch-go/dispatchproto" "github.com/dispatchrun/dispatch-go/dispatchtest" ) @@ -14,7 +14,7 @@ func TestClient(t *testing.T) { recorder := &dispatchtest.CallRecorder{} server := dispatchtest.NewServer(recorder) - client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL)) + client, err := dispatchclient.New(dispatchclient.APIKey("foobar"), dispatchclient.APIUrl(server.URL)) if err != nil { t.Fatal(err) } @@ -36,7 +36,7 @@ func TestClientEnvConfig(t *testing.T) { recorder := &dispatchtest.CallRecorder{} server := dispatchtest.NewServer(recorder) - client, err := dispatch.NewClient(dispatch.Env( + client, err := dispatchclient.New(dispatchclient.Env( "DISPATCH_API_KEY=foobar", "DISPATCH_API_URL="+server.URL, )) @@ -61,7 +61,7 @@ func TestClientBatch(t *testing.T) { recorder := &dispatchtest.CallRecorder{} server := dispatchtest.NewServer(recorder) - client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL)) + client, err := dispatchclient.New(dispatchclient.APIKey("foobar"), dispatchclient.APIUrl(server.URL)) if err != nil { t.Fatal(err) } @@ -98,7 +98,7 @@ func TestClientBatch(t *testing.T) { } func TestClientNoAPIKey(t *testing.T) { - _, err := dispatch.NewClient(dispatch.Env( /* i.e. no env vars */ )) + _, err := dispatchclient.New(dispatchclient.Env( /* i.e. no env vars */ )) if err == nil { t.Fatalf("expected an error") } else if err.Error() != "Dispatch API key has not been set. Use APIKey(..), or set the DISPATCH_API_KEY environment variable" { diff --git a/dispatchclient/serde.go b/dispatchclient/serde.go new file mode 100644 index 0000000..bb47225 --- /dev/null +++ b/dispatchclient/serde.go @@ -0,0 +1,24 @@ +package dispatchclient + +import "github.com/dispatchrun/coroutine/types" + +func init() { + types.Register(clientSerializer, clientDeserializer) +} + +func clientSerializer(s *types.Serializer, c *Client) error { + types.SerializeT(s, c.opts) + return nil +} + +func clientDeserializer(d *types.Deserializer, c *Client) error { + var opts []Option + types.DeserializeTo(d, &opts) + + client, err := New(opts...) + if err != nil { + return err + } + *c = *client + return nil +} diff --git a/dispatchproto/serde.go b/dispatchproto/serde.go new file mode 100644 index 0000000..4228b39 --- /dev/null +++ b/dispatchproto/serde.go @@ -0,0 +1,59 @@ +package dispatchproto + +import ( + "fmt" + + "github.com/dispatchrun/coroutine/types" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +func init() { + types.Register(protoSerializer, protoDeserializer) +} + +func protoSerializer(s *types.Serializer, mp *proto.Message) error { + m := *mp + if m == nil { + types.SerializeT(s, false) + return nil + } + + any, err := anypb.New(m) + if err != nil { + return fmt.Errorf("anypb.New: %w", err) + } + b, err := proto.Marshal(any) + if err != nil { + return fmt.Errorf("proto.Marshal: %w", err) + } + + types.SerializeT(s, true) + types.SerializeT(s, b) + + return nil +} + +func protoDeserializer(d *types.Deserializer, mp *proto.Message) error { + var ok bool + types.DeserializeTo(d, &ok) + if !ok { + *mp = nil + return nil + } + + var b []byte + types.DeserializeTo(d, &b) + + var any anypb.Any + if err := proto.Unmarshal(b, &any); err != nil { + return fmt.Errorf("proto.Unmarshal: %w", err) + } + m, err := any.UnmarshalNew() + if err != nil { + return fmt.Errorf("anypb.UnmarshalNew: %w", err) + } + *mp = m + + return nil +} diff --git a/dispatchtest/endpoint.go b/dispatchtest/endpoint.go index 8c9e420..99b2b6c 100644 --- a/dispatchtest/endpoint.go +++ b/dispatchtest/endpoint.go @@ -15,7 +15,7 @@ import ( // // Unlike dispatch.New, it starts a test server that serves the endpoint // and automatically sets the endpoint URL. -func NewEndpoint(opts ...dispatch.DispatchOption) (*dispatch.Dispatch, *EndpointServer, error) { +func NewEndpoint(opts ...dispatch.Option) (*dispatch.Dispatch, *EndpointServer, error) { mux := http.NewServeMux() server := httptest.NewServer(mux) diff --git a/env.go b/env.go deleted file mode 100644 index 044737c..0000000 --- a/env.go +++ /dev/null @@ -1,35 +0,0 @@ -//go:build !durable - -package dispatch - -import ( - "slices" - "strings" -) - -// Env sets the environment variables that a Dispatch endpoint -// or Client parses its default configuration from. -// -// It defaults to os.Environ(). -func Env(env ...string) interface { - DispatchOption - ClientOption -} { - return envOption(env) -} - -type envOption []string - -func (env envOption) configureDispatch(d *Dispatch) { d.env = slices.Clone(env) } -func (env envOption) configureClient(c *Client) { c.env = slices.Clone(env) } - -func getenv(env []string, name string) string { - var value string - for _, s := range env { - n, v, ok := strings.Cut(s, "=") - if ok && n == name { - value = v - } - } - return value -} diff --git a/internal/env/env.go b/internal/env/env.go new file mode 100644 index 0000000..b240568 --- /dev/null +++ b/internal/env/env.go @@ -0,0 +1,15 @@ +package env + +import "strings" + +// Get gets an environment variable value from a set of environment variables. +func Get(env []string, name string) string { + var value string + for _, s := range env { + n, v, ok := strings.Cut(s, "=") + if ok && n == name { + value = v + } + } + return value +} diff --git a/serde.go b/serde.go index cd2d98d..6682641 100644 --- a/serde.go +++ b/serde.go @@ -2,68 +2,14 @@ package dispatch -import ( - "fmt" - - "github.com/dispatchrun/coroutine/types" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" -) +import "github.com/dispatchrun/coroutine/types" func init() { - types.Register(protoSerializer, protoDeserializer) types.Register(dispatchSerializer, dispatchDeserializer) - types.Register(clientSerializer, clientDeserializer) -} - -func protoSerializer(s *types.Serializer, mp *proto.Message) error { - m := *mp - if m == nil { - types.SerializeT(s, false) - return nil - } - - any, err := anypb.New(m) - if err != nil { - return fmt.Errorf("anypb.New: %w", err) - } - b, err := proto.Marshal(any) - if err != nil { - return fmt.Errorf("proto.Marshal: %w", err) - } - - types.SerializeT(s, true) - types.SerializeT(s, b) - - return nil -} - -func protoDeserializer(d *types.Deserializer, mp *proto.Message) error { - var ok bool - types.DeserializeTo(d, &ok) - if !ok { - *mp = nil - return nil - } - - var b []byte - types.DeserializeTo(d, &b) - - var any anypb.Any - if err := proto.Unmarshal(b, &any); err != nil { - return fmt.Errorf("proto.Unmarshal: %w", err) - } - m, err := any.UnmarshalNew() - if err != nil { - return fmt.Errorf("anypb.UnmarshalNew: %w", err) - } - *mp = m - - return nil } type serializedDispatch struct { - opts []DispatchOption + opts []Option functions *FunctionRegistry } @@ -84,20 +30,3 @@ func dispatchDeserializer(d *types.Deserializer, c *Dispatch) error { *c = *dispatch //nolint return nil } - -func clientSerializer(s *types.Serializer, c *Client) error { - types.SerializeT(s, c.opts) - return nil -} - -func clientDeserializer(d *types.Deserializer, c *Client) error { - var opts []ClientOption - types.DeserializeTo(d, &opts) - - client, err := NewClient(opts...) - if err != nil { - return err - } - *c = *client - return nil -}