Skip to content

Commit

Permalink
feat: add routing table package (#3613)
Browse files Browse the repository at this point in the history
This is a prototype routing package that syncs routes from the schema.
It will no doubt need adjustment.
  • Loading branch information
alecthomas authored Dec 4, 2024
1 parent 8387b88 commit 5839685
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
70 changes: 70 additions & 0 deletions internal/routing/routing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package routing

import (
"context"
"net/url"

"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

type RouteTable struct {
// Routes keyed by module name. TODO: this should be keyed by deployment key.
routes *atomic.Value[map[string]*url.URL]
}

func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable {
r := &RouteTable{
routes: atomic.New(extractRoutes(ctx, changes.View())),
}
go r.run(ctx, changes)
return r
}

func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSource) {
for {
select {
case <-ctx.Done():
return

case event := <-changes.Events():
routes := extractRoutes(ctx, event.Schema())
r.routes.Store(routes)
}
}
}

// Get returns the URL for the given deployment or None if it doesn't exist.
func (r *RouteTable) Get(deployment model.DeploymentKey) optional.Option[*url.URL] {
routes := r.routes.Load()
return optional.Zero(routes[deployment.Payload.Module])
}

// GetForModule returns the URL for the given module or None if it doesn't exist.
func (r *RouteTable) GetForModule(module string) optional.Option[*url.URL] {
routes := r.routes.Load()
return optional.Zero(routes[module])
}

func extractRoutes(ctx context.Context, schema *schema.Schema) map[string]*url.URL {
logger := log.FromContext(ctx)
out := make(map[string]*url.URL, len(schema.Modules))
for _, module := range schema.Modules {
if module.Runtime == nil || module.Runtime.Deployment == nil {
continue
}
rt := module.Runtime.Deployment
u, err := url.Parse(rt.Endpoint)
if err != nil {
logger.Warnf("Failed to parse endpoint URL for module %q: %v", module.Name, err)
continue
}
out[module.Name] = u
}
return out
}
48 changes: 48 additions & 0 deletions internal/routing/routing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package routing

import (
"context"
"net/url"
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/must"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

func TestRouting(t *testing.T) {
events := schemaeventsource.NewUnattached()
events.Publish(schemaeventsource.EventUpsert{
Module: &schema.Module{
Name: "time",
Runtime: &schema.ModuleRuntime{
Deployment: &schema.ModuleRuntimeDeployment{
Endpoint: "http://time.ftl",
},
},
},
})

rt := New(log.ContextWithNewDefaultLogger(context.TODO()), events)
assert.Equal(t, optional.Some(must.Get(url.Parse("http://time.ftl"))), rt.GetForModule("time"))
assert.Equal(t, optional.None[*url.URL](), rt.GetForModule("echo"))

events.Publish(schemaeventsource.EventUpsert{
Module: &schema.Module{
Name: "echo",
Runtime: &schema.ModuleRuntime{
Deployment: &schema.ModuleRuntimeDeployment{
Endpoint: "http://echo.ftl",
},
},
},
})

time.Sleep(time.Millisecond * 250)
assert.Equal(t, optional.Some(must.Get(url.Parse("http://echo.ftl"))), rt.GetForModule("echo"))
}

0 comments on commit 5839685

Please sign in to comment.