From 58396853ca2e8b1b6e4c67a97016d3b949964b9f Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 4 Dec 2024 12:17:51 +1100 Subject: [PATCH] feat: add routing table package (#3613) This is a prototype routing package that syncs routes from the schema. It will no doubt need adjustment. --- internal/routing/routing.go | 70 ++++++++++++++++++++++++++++++++ internal/routing/routing_test.go | 48 ++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 internal/routing/routing.go create mode 100644 internal/routing/routing_test.go diff --git a/internal/routing/routing.go b/internal/routing/routing.go new file mode 100644 index 0000000000..8374239d51 --- /dev/null +++ b/internal/routing/routing.go @@ -0,0 +1,70 @@ +package routing + +import ( + "context" + "net/url" + + "github.com/alecthomas/atomic" + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" +) + +type RouteTable struct { + // Routes keyed by module name. TODO: this should be keyed by deployment key. + routes *atomic.Value[map[string]*url.URL] +} + +func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable { + r := &RouteTable{ + routes: atomic.New(extractRoutes(ctx, changes.View())), + } + go r.run(ctx, changes) + return r +} + +func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSource) { + for { + select { + case <-ctx.Done(): + return + + case event := <-changes.Events(): + routes := extractRoutes(ctx, event.Schema()) + r.routes.Store(routes) + } + } +} + +// Get returns the URL for the given deployment or None if it doesn't exist. +func (r *RouteTable) Get(deployment model.DeploymentKey) optional.Option[*url.URL] { + routes := r.routes.Load() + return optional.Zero(routes[deployment.Payload.Module]) +} + +// GetForModule returns the URL for the given module or None if it doesn't exist. +func (r *RouteTable) GetForModule(module string) optional.Option[*url.URL] { + routes := r.routes.Load() + return optional.Zero(routes[module]) +} + +func extractRoutes(ctx context.Context, schema *schema.Schema) map[string]*url.URL { + logger := log.FromContext(ctx) + out := make(map[string]*url.URL, len(schema.Modules)) + for _, module := range schema.Modules { + if module.Runtime == nil || module.Runtime.Deployment == nil { + continue + } + rt := module.Runtime.Deployment + u, err := url.Parse(rt.Endpoint) + if err != nil { + logger.Warnf("Failed to parse endpoint URL for module %q: %v", module.Name, err) + continue + } + out[module.Name] = u + } + return out +} diff --git a/internal/routing/routing_test.go b/internal/routing/routing_test.go new file mode 100644 index 0000000000..994cce357a --- /dev/null +++ b/internal/routing/routing_test.go @@ -0,0 +1,48 @@ +package routing + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/must" + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" +) + +func TestRouting(t *testing.T) { + events := schemaeventsource.NewUnattached() + events.Publish(schemaeventsource.EventUpsert{ + Module: &schema.Module{ + Name: "time", + Runtime: &schema.ModuleRuntime{ + Deployment: &schema.ModuleRuntimeDeployment{ + Endpoint: "http://time.ftl", + }, + }, + }, + }) + + rt := New(log.ContextWithNewDefaultLogger(context.TODO()), events) + assert.Equal(t, optional.Some(must.Get(url.Parse("http://time.ftl"))), rt.GetForModule("time")) + assert.Equal(t, optional.None[*url.URL](), rt.GetForModule("echo")) + + events.Publish(schemaeventsource.EventUpsert{ + Module: &schema.Module{ + Name: "echo", + Runtime: &schema.ModuleRuntime{ + Deployment: &schema.ModuleRuntimeDeployment{ + Endpoint: "http://echo.ftl", + }, + }, + }, + }) + + time.Sleep(time.Millisecond * 250) + assert.Equal(t, optional.Some(must.Get(url.Parse("http://echo.ftl"))), rt.GetForModule("echo")) +}