Skip to content

Commit

Permalink
fix: fail BindAllocator.Next() allocation for required ports (#3215)
Browse files Browse the repository at this point in the history
The BindAllocator was designed for dynamically allocating ports, and
will skip bound ports. This is intentional, but we don't want the
initial controller ports (ingress and RPC) to be skipped if those ports
are already bound.

Fixes #3190
  • Loading branch information
alecthomas authored Oct 28, 2024
1 parent 7d74fbd commit 4435fcc
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 65 deletions.
11 changes: 9 additions & 2 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,20 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]
logger := log.FromContext(ctx)

bind := l.portAllocator.Next()
bind, err := l.portAllocator.Next()
if err != nil {
return fmt.Errorf("failed to start runner: %w", err)
}
var debug *localdebug.DebugInfo
debugPort := 0
if ide, ok := l.ideSupport.Get(); ok {
debugBind, err := l.portAllocator.NextPort()
if err != nil {
return fmt.Errorf("failed to start runner: %w", err)
}
debug = &localdebug.DebugInfo{
Language: info.language,
Port: l.portAllocator.NextPort(),
Port: debugBind,
}
l.debugPorts[info.module] = debug
ide.SyncIDEDebugIntegrations(ctx, l.debugPorts)
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ func (b *boxCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceC
}

// use the cli endpoint to create the bind allocator, but leave the first port unused as it is meant to be reserved by a controller
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint)
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint, 0)
if err != nil {
return fmt.Errorf("could not create bind allocator: %w", err)
}
_ = bindAllocator.Next()
_, _ = bindAllocator.Next() //nolint:errcheck

engine, err := buildengine.New(ctx, client, projConfig, b.Build.Dirs, bindAllocator, buildengine.BuildEnv(b.Build.BuildEnv), buildengine.Parallelism(b.Build.Parallelism))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er
config.SetDefaults()

// Start the controller.
runnerPortAllocator, err := bind.NewBindAllocator(b.RunnerBase)
runnerPortAllocator, err := bind.NewBindAllocator(b.RunnerBase, 0)
if err != nil {
return fmt.Errorf("failed to create runner port allocator: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func (b *buildCmd) Run(ctx context.Context, client ftlv1connect.ControllerServic
return errors.New("no directories specified")
}
// use the cli endpoint to create the bind allocator, but leave the first port unused as it is meant to be reserved by a controller
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint)
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint, 0)
if err != nil {
return fmt.Errorf("could not create bind allocator: %w", err)
}
_ = bindAllocator.Next()
_, _ = bindAllocator.Next() //nolint:errcheck

engine, err := buildengine.New(ctx, client, projConfig, b.Dirs, bindAllocator, buildengine.BuildEnv(b.BuildEnv), buildengine.Parallelism(b.Parallelism))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func (d *deployCmd) Run(ctx context.Context, projConfig projectconfig.Config) er
}

// use the cli endpoint to create the bind allocator, but leave the first port unused as it is meant to be reserved by a controller
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint)
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint, 0)
if err != nil {
return fmt.Errorf("could not create bind allocator: %w", err)
}
_ = bindAllocator.Next()
_, _ = bindAllocator.Next() //nolint:errcheck

engine, err := buildengine.New(ctx, client, projConfig, d.Build.Dirs, bindAllocator, buildengine.BuildEnv(d.Build.BuildEnv), buildengine.Parallelism(d.Build.Parallelism))
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions frontend/cli/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (d *devCmd) Run(ctx context.Context, k *kong.Kong, projConfig projectconfig
sm := terminal.FromContext(ctx)
starting := sm.NewStatus("\u001B[92mStarting FTL Server 🚀\u001B[39m")

bindAllocator, err := bind.NewBindAllocator(d.ServeCmd.Bind)
bindAllocator, err := bind.NewBindAllocator(d.ServeCmd.Bind, 1)
if err != nil {
return fmt.Errorf("could not create bind allocator: %w", err)
}
Expand All @@ -82,9 +82,6 @@ func (d *devCmd) Run(ctx context.Context, k *kong.Kong, projConfig projectconfig
}
d.ServeCmd.Stop = false
}
if d.ServeCmd.isRunning(ctx, client) {
return errors.New(ftlRunningErrorMsg)
}

g.Go(func() error {
return d.ServeCmd.run(ctx, projConfig, optional.Some(controllerReady), true, bindAllocator)
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPl
return optionalPlugin, fmt.Errorf("could not parse default bind URL: %w", err)
}
var bindAllocator *bind.BindAllocator
bindAllocator, err = bind.NewBindAllocator(pluginBind)
bindAllocator, err = bind.NewBindAllocator(pluginBind, 0)
if err != nil {
return optionalPlugin, fmt.Errorf("could not create bind allocator: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_schema_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (d *schemaDiffCmd) Run(ctx context.Context, currentURL *url.URL, projConfig

// use the cli endpoint to create the bind allocator, but leave the first port unused as it is meant to be reserved by a controller
var bindAllocator *bind.BindAllocator
bindAllocator, err = bind.NewBindAllocator(cli.Endpoint)
bindAllocator, err = bind.NewBindAllocator(cli.Endpoint, 0)
if err != nil {
return fmt.Errorf("could not create bind allocator: %w", err)
}
_ = bindAllocator.Next()
_, _ = bindAllocator.Next() //nolint:errcheck

other, err = localSchema(ctx, projConfig, bindAllocator)
} else {
Expand Down
47 changes: 16 additions & 31 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"net"
"net/url"
"os"
osExec "os/exec" //nolint:depguard
Expand Down Expand Up @@ -57,26 +56,16 @@ type serveCmd struct {
const ftlRunningErrorMsg = "FTL is already running. Use 'ftl serve --stop' to stop it"

func (s *serveCmd) Run(ctx context.Context, projConfig projectconfig.Config) error {
bindAllocator, err := bind.NewBindAllocator(s.Bind)
bindAllocator, err := bind.NewBindAllocator(s.Bind, 2)
if err != nil {
return fmt.Errorf("could not create bind allocator: %w", err)
}
return s.run(ctx, projConfig, optional.None[chan bool](), false, bindAllocator)
}

//nolint:maintidx
func isPortAvailable(host string, port string) bool {
ln, err := net.Listen("tcp", net.JoinHostPort(host, port))
if err != nil {
return false
}
ln.Close()
return true
}

func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, initialised optional.Option[chan bool], devMode bool, bindAllocator *bind.BindAllocator) error {
logger := log.FromContext(ctx)

controllerClient := rpc.ClientFromContext[ftlv1connect.ControllerServiceClient](ctx)
provisionerClient := rpc.ClientFromContext[provisionerconnect.ProvisionerServiceClient](ctx)

Expand All @@ -86,13 +75,6 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
_ = KillBackgroundServe(logger) //nolint:errcheck // ignore error here if the process is not running
}

// Check port availability before starting in background
host := s.Bind.Hostname()
port := s.Bind.Port()
if !isPortAvailable(host, port) {
return fmt.Errorf("port %s is already in use on %s", port, host)
}

if err := runInBackground(logger); err != nil {
return err
}
Expand All @@ -113,10 +95,6 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
return KillBackgroundServe(logger)
}

if s.isRunning(ctx, controllerClient) {
return errors.New(ftlRunningErrorMsg)
}

if s.Provisioners > 0 {
logger.Infof("Starting FTL with %d controller(s) and %d provisioner(s)", s.Controllers, s.Provisioners)
} else {
Expand All @@ -138,8 +116,16 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
controllerAddresses := make([]*url.URL, 0, s.Controllers)
controllerIngressAddresses := make([]*url.URL, 0, s.Controllers)
for range s.Controllers {
controllerIngressAddresses = append(controllerIngressAddresses, bindAllocator.Next())
controllerAddresses = append(controllerAddresses, bindAllocator.Next())
ingressBind, err := bindAllocator.Next()
if err != nil {
return fmt.Errorf("could not allocate port for controller ingress: %w", err)
}
controllerIngressAddresses = append(controllerIngressAddresses, ingressBind)
controllerBind, err := bindAllocator.Next()
if err != nil {
return fmt.Errorf("could not allocate port for controller: %w", err)
}
controllerAddresses = append(controllerAddresses, controllerBind)
}

for _, addr := range controllerAddresses {
Expand All @@ -153,7 +139,11 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini

provisionerAddresses := make([]*url.URL, 0, s.Provisioners)
for range s.Provisioners {
provisionerAddresses = append(provisionerAddresses, bindAllocator.Next())
bind, err := bindAllocator.Next()
if err != nil {
return fmt.Errorf("could not allocate port for provisioner: %w", err)
}
provisionerAddresses = append(provisionerAddresses, bind)
}

runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration)
Expand Down Expand Up @@ -426,8 +416,3 @@ func waitForControllerOnline(ctx context.Context, startupTimeout time.Duration,
}
}
}

func (s *serveCmd) isRunning(ctx context.Context, client rpc.Pingable) bool {
_, err := client.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
return err == nil
}
34 changes: 24 additions & 10 deletions internal/bind/bind_allocator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bind

import (
"fmt"
"net"
"net/url"
"strconv"
Expand All @@ -9,11 +10,16 @@ import (
)

type BindAllocator struct {
baseURL *url.URL
port atomic.Int32
baseURL *url.URL
dynamicPortsAfter int
port atomic.Int32
}

func NewBindAllocator(url *url.URL) (*BindAllocator, error) {
// NewBindAllocator creates a BindAllocator, which dynamically allocates ports for binding local servers.
//
// "staticPorts" is the number of ports that are statically allocated and that must be free before dynamic ports
// are allocated.
func NewBindAllocator(url *url.URL, staticPorts int) (*BindAllocator, error) {
_, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return nil, err
Expand All @@ -25,12 +31,13 @@ func NewBindAllocator(url *url.URL) (*BindAllocator, error) {
}

return &BindAllocator{
baseURL: url,
port: atomic.NewInt32(int32(port) - 1), //nolint:gosec
baseURL: url,
port: atomic.NewInt32(int32(port) - 1), //nolint:gosec
dynamicPortsAfter: port + staticPorts,
}, nil
}

func (b *BindAllocator) NextPort() int {
func (b *BindAllocator) NextPort() (int, error) {
var l *net.TCPListener
var err error

Expand All @@ -43,19 +50,26 @@ func (b *BindAllocator) NextPort() int {
l, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(b.baseURL.Hostname()), Port: port})

if err != nil {
if port < b.dynamicPortsAfter {
return 0, fmt.Errorf("failed to bind to port %d: %w", port, err)
}
if tries >= maxTries {
panic("failed to find an open port: " + err.Error())
}
continue
}
_ = l.Close()

return port
return port, nil
}
}

func (b *BindAllocator) Next() *url.URL {
func (b *BindAllocator) Next() (*url.URL, error) {
newURL := *b.baseURL
newURL.Host = net.JoinHostPort(b.baseURL.Hostname(), strconv.Itoa(b.NextPort()))
return &newURL
nextPort, err := b.NextPort()
if err != nil {
return nil, err
}
newURL.Host = net.JoinHostPort(b.baseURL.Hostname(), strconv.Itoa(nextPort))
return &newURL, nil
}
2 changes: 1 addition & 1 deletion internal/buildengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestGraph(t *testing.T) {

bindURL, err := url.Parse("http://127.0.0.1:8893")
assert.NoError(t, err)
bindAllocator, err := bind.NewBindAllocator(bindURL)
bindAllocator, err := bind.NewBindAllocator(bindURL, 0)
assert.NoError(t, err)

projConfig := projectconfig.Config{
Expand Down
14 changes: 8 additions & 6 deletions internal/buildengine/languageplugin/external_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/result"
"github.com/bmatcuk/doublestar/v4"
"golang.org/x/sync/errgroup"

langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/bind"
Expand All @@ -25,10 +30,6 @@ import (
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/TBD54566975/ftl/internal/watch"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/result"
"github.com/bmatcuk/doublestar/v4"
"golang.org/x/sync/errgroup"
)

// These integration tests are meant as a test suite for external plugins.
Expand Down Expand Up @@ -238,10 +239,11 @@ func startPlugin() in.Action {
in.Infof("Starting plugin")
baseBind, err := url.Parse("http://127.0.0.1:8893")
assert.NoError(t, err)
bindAllocator, err := bind.NewBindAllocator(baseBind)
bindAllocator, err := bind.NewBindAllocator(baseBind, 0)
assert.NoError(t, err)

bindURL = bindAllocator.Next()
bindURL, err = bindAllocator.Next()
assert.NoError(t, err)
client, err = newExternalPluginImpl(ic.Context, bindURL, ic.Language)
assert.NoError(t, err)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/buildengine/languageplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func New(ctx context.Context, bindAllocator *bind.BindAllocator, language string
case "rust":
return newRustPlugin(ctx), nil
default:
return newExternalPlugin(ctx, bindAllocator.Next(), language)
port, err := bindAllocator.Next()
if err != nil {
return nil, fmt.Errorf("failed to allocate port for external plugin: %w", err)
}
return newExternalPlugin(ctx, port, language)
}
}

Expand Down

0 comments on commit 4435fcc

Please sign in to comment.