Skip to content

Commit

Permalink
feat: proxy calls through the runner (#3586)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Dec 3, 2024
1 parent d476a24 commit c5733cd
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 17 deletions.
91 changes: 91 additions & 0 deletions backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package proxy

import (
"context"
"fmt"

"connectrpc.com/connect"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/rpc/headers"
)

var _ ftlv1connect.VerbServiceHandler = &Service{}
var _ ftlv1connect.ModuleServiceHandler = &Service{}

type Service struct {
controllerVerbService ftlv1connect.VerbServiceClient
controllerModuleService ftlv1connect.ModuleServiceClient
}

func New(controllerVerbService ftlv1connect.VerbServiceClient, controllerModuleService ftlv1connect.ModuleServiceClient) *Service {
proxy := &Service{
controllerVerbService: controllerVerbService,
controllerModuleService: controllerModuleService,
}
return proxy
}

func (r *Service) GetModuleContext(ctx context.Context, c *connect.Request[ftlv1.GetModuleContextRequest], c2 *connect.ServerStream[ftlv1.GetModuleContextResponse]) error {
moduleContext, err := r.controllerModuleService.GetModuleContext(ctx, connect.NewRequest(c.Msg))
if err != nil {
return fmt.Errorf("failed to get module context: %w", err)
}
for {
rcv := moduleContext.Receive()
if rcv {
err := c2.Send(moduleContext.Msg())
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
} else if moduleContext.Err() != nil {
return fmt.Errorf("failed to receive message: %w", moduleContext.Err())
}
}

}

func (r *Service) AcquireLease(ctx context.Context, c *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
lease := r.controllerModuleService.AcquireLease(ctx)
for {
req, err := c.Receive()
if err != nil {
return fmt.Errorf("failed to receive message: %w", err)
}
err = lease.Send(req)
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
msg, err := lease.Receive()
if err != nil {
return fmt.Errorf("failed to receive response message: %w", err)
}
err = c.Send(msg)
if err != nil {
return fmt.Errorf("failed to send response message: %w", err)
}
}

}

func (r *Service) PublishEvent(ctx context.Context, c *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
event, err := r.controllerModuleService.PublishEvent(ctx, connect.NewRequest(c.Msg))
if err != nil {
return nil, fmt.Errorf("failed to proxy event: %w", err)
}
return event, nil
}

func (r *Service) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (r *Service) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {

call, err := r.controllerVerbService.Call(ctx, headers.CopyRequestForForwarding(c))
if err != nil {
return nil, fmt.Errorf("failed to proxy verb: %w", err)
}
return call, nil
}
3 changes: 1 addition & 2 deletions backend/runner/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
pb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/publish/v1"
pbconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/publish/v1/publishpbconnect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/schema"
sl "github.com/TBD54566975/ftl/internal/slices"
)
Expand All @@ -20,7 +19,7 @@ type Service struct {

var _ pbconnect.PublishServiceHandler = (*Service)(nil)

func New(ctx context.Context, module *schema.Module, client ftlv1connect.VerbServiceClient) (*Service, error) {
func New(module *schema.Module) (*Service, error) {
publishers := map[string]*publisher{}
for t := range sl.FilterVariants[*schema.Topic](module.Decls) {
publisher, err := newPublisher(t)
Expand Down
52 changes: 39 additions & 13 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/runner/observability"
"github.com/TBD54566975/ftl/backend/runner/proxy"
"github.com/TBD54566975/ftl/backend/runner/pubsub"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/download"
Expand Down Expand Up @@ -178,7 +179,6 @@ func (s *Service) startDeployment(ctx context.Context, key model.DeploymentKey,
}()
return fmt.Errorf("failure in runner: %w", rpc.Serve(ctx, s.config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, s),
rpc.GRPC(pubconnect.NewPublishServiceHandler, s.pubSub),
rpc.HTTP("/", s),
rpc.HealthCheck(s.healthCheck),
))
Expand Down Expand Up @@ -302,8 +302,9 @@ type Service struct {
deploymentLogQueue chan log.Entry
cancelFunc func()
devEndpoint optional.Option[url.URL]

pubSub *pubsub.Service
proxy *proxy.Service
pubSub *pubsub.Service
proxyBindAddress *url.URL
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down Expand Up @@ -401,9 +402,41 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
return fmt.Errorf("failed to download artefacts: %w", err)
}

envVars := []string{
"FTL_ENDPOINT=" + s.config.ControllerEndpoint.String(),
"FTL_RUNNER_ENDPOINT=" + s.config.Bind.String(),
pubSub, err := pubsub.New(module)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to create pubsub service: %w", err)
}
s.pubSub = pubSub

moduleServiceClient := rpc.Dial(ftlv1connect.NewModuleServiceClient, s.config.ControllerEndpoint.String(), log.Error)
verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, s.config.ControllerEndpoint.String(), log.Error)
s.proxy = proxy.New(verbServiceClient, moduleServiceClient)

parse, err := url.Parse("http://127.0.0.1:0")
if err != nil {
return fmt.Errorf("failed to parse url: %w", err)
}
proxyServer, err := rpc.NewServer(ctx, parse,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, s.proxy),
rpc.GRPC(ftlv1connect.NewModuleServiceHandler, s.proxy),
rpc.GRPC(pubconnect.NewPublishServiceHandler, s.pubSub),
)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
urls := proxyServer.Bind.Subscribe(nil)
go func() {
err := proxyServer.Serve(ctx)
if err != nil {
logger.Errorf(err, "failed to serve")
return
}
}()
s.proxyBindAddress = <-urls

logger.Debugf("Setting FTL_ENDPOINT to %s", s.proxyBindAddress.String())
envVars := []string{"FTL_ENDPOINT=" + s.proxyBindAddress.String(),
"FTL_CONFIG=" + strings.Join(s.config.Config, ","),
"FTL_OBSERVABILITY_ENDPOINT=" + s.config.ControllerEndpoint.String()}
if s.config.DebugPort > 0 {
Expand All @@ -429,13 +462,6 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
dep = s.makeDeployment(cmdCtx, key, deployment)
}

pubSub, err := pubsub.New(ctx, module, dep.client)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to create pubsub service: %w", err)
}
s.pubSub = pubSub

s.readyTime.Store(time.Now().Add(time.Second * 2)) // Istio is a bit flakey, add a small delay for readiness
s.deployment.Store(optional.Some(dep))
logger.Debugf("Deployed %s", key)
Expand Down
3 changes: 1 addition & 2 deletions go-runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

type UserVerbConfig struct {
FTLEndpoint *url.URL `help:"FTL endpoint." env:"FTL_ENDPOINT" required:""`
RunnerEndpoint *url.URL `help:"Runner endpoint." env:"FTL_RUNNER_ENDPOINT" required:""`
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
Config []string `name:"config" short:"C" help:"Paths to FTL project configuration files." env:"FTL_CONFIG" placeholder:"FILE[,FILE,...]" type:"existingfile"`
}
Expand All @@ -43,7 +42,7 @@ func NewUserVerbServer(projectName string, moduleName string, handlers ...Handle
ctx = rpc.ContextWithClient(ctx, moduleServiceClient)
verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, uc.FTLEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, verbServiceClient)
pubClient := rpc.Dial(pubconnect.NewPublishServiceClient, uc.RunnerEndpoint.String(), log.Error)
pubClient := rpc.Dial(pubconnect.NewPublishServiceClient, uc.FTLEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, pubClient)

moduleContextSupplier := modulecontext.NewModuleContextSupplier(moduleServiceClient)
Expand Down

0 comments on commit c5733cd

Please sign in to comment.