diff --git a/common/plugin/spawn.go b/common/plugin/spawn.go index 4b79ebb1f2..aeabdfc101 100644 --- a/common/plugin/spawn.go +++ b/common/plugin/spawn.go @@ -161,21 +161,32 @@ func Spawn[Client PingableClient]( return nil, nil, err } + dialCtx, cancel := context.WithTimeout(ctx, opts.startTimeout) + defer cancel() + // Wait for the plugin to start. client := rpc.Dial(makeClient, pluginEndpoint.String(), log.Trace) pingErr := make(chan error) go func() { retry := backoff.Backoff{Min: pluginRetryDelay, Max: pluginRetryDelay} - err := rpc.Wait(ctx, retry, opts.startTimeout, client) + err := rpc.Wait(dialCtx, retry, client) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Deliberately don't close pingErr because the select loop below + // will catch dialCtx closing and return a better error. + return + } pingErr <- err close(pingErr) }() select { + case <-dialCtx.Done(): + return nil, nil, fmt.Errorf("plugin timed out while starting: %w", dialCtx.Err()) + case <-cmdCtx.Done(): return nil, nil, fmt.Errorf("plugin process died: %w", cmdCtx.Err()) - case err = <-pingErr: + case err := <-pingErr: if err != nil { return nil, nil, fmt.Errorf("plugin failed to respond to ping: %w", err) } diff --git a/frontend/cli/cmd_bench.go b/frontend/cli/cmd_bench.go index b678b0fd55..3df6265163 100644 --- a/frontend/cli/cmd_bench.go +++ b/frontend/cli/cmd_bench.go @@ -29,7 +29,9 @@ type benchCmd struct { } func (c *benchCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient) error { - if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil { + ctx, cancel := context.WithTimeout(ctx, c.Wait) + defer cancel() + if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil { return fmt.Errorf("FTL cluster did not become ready: %w", err) } logger := log.FromContext(ctx) diff --git a/frontend/cli/cmd_box_run.go b/frontend/cli/cmd_box_run.go index 06ee40af78..78e365a24d 100644 --- a/frontend/cli/cmd_box_run.go +++ b/frontend/cli/cmd_box_run.go @@ -69,7 +69,9 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er // Wait for the controller to come up. client := ftlv1connect.NewControllerServiceClient(rpc.GetHTTPClient(b.Bind.String()), b.Bind.String()) - if err := rpc.Wait(ctx, backoff.Backoff{}, b.ControllerTimeout, client); err != nil { + waitCtx, cancel := context.WithTimeout(ctx, b.ControllerTimeout) + defer cancel() + if err := rpc.Wait(waitCtx, backoff.Backoff{}, client); err != nil { return fmt.Errorf("controller failed to start: %w", err) } diff --git a/frontend/cli/cmd_call.go b/frontend/cli/cmd_call.go index 26e2bdb84d..ec82c9ff00 100644 --- a/frontend/cli/cmd_call.go +++ b/frontend/cli/cmd_call.go @@ -29,7 +29,9 @@ type callCmd struct { } func (c *callCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli ftlv1connect.ControllerServiceClient) error { - if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil { + ctx, cancel := context.WithTimeout(ctx, c.Wait) + defer cancel() + if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil { return err } diff --git a/frontend/cli/cmd_ping.go b/frontend/cli/cmd_ping.go index 72bdf2998b..418331d285 100644 --- a/frontend/cli/cmd_ping.go +++ b/frontend/cli/cmd_ping.go @@ -15,5 +15,7 @@ type pingCmd struct { } func (c *pingCmd) Run(ctx context.Context, controller ftlv1connect.ControllerServiceClient) error { - return rpc.Wait(ctx, backoff.Backoff{Max: time.Second}, c.Wait, controller) //nolint:wrapcheck + ctx, cancel := context.WithTimeout(ctx, c.Wait) + defer cancel() + return rpc.Wait(ctx, backoff.Backoff{Max: time.Second}, controller) //nolint:wrapcheck } diff --git a/frontend/cli/cmd_replay.go b/frontend/cli/cmd_replay.go index 37a6425c95..ad4713c77d 100644 --- a/frontend/cli/cmd_replay.go +++ b/frontend/cli/cmd_replay.go @@ -26,15 +26,14 @@ type replayCmd struct { } func (c *replayCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli ftlv1connect.ControllerServiceClient) error { - // Wait timeout is for both pings to complete, not each ping individually - startTime := time.Now() - - if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil { + ctx, cancel := context.WithTimeout(ctx, c.Wait) + defer cancel() + if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil { return fmt.Errorf("failed to wait for client: %w", err) } consoleServiceClient := rpc.Dial(pbconsoleconnect.NewConsoleServiceClient, cli.Endpoint.String(), log.Error) - if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait-time.Since(startTime), consoleServiceClient); err != nil { + if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, consoleServiceClient); err != nil { return fmt.Errorf("failed to wait for console service client: %w", err) } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 9733b5624a..869fc2d86f 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -83,7 +83,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini return err } if s.Provisioners > 0 { - if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, s.StartupTimeout, provisionerClient); err != nil { + if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, provisionerClient); err != nil { return fmt.Errorf("provisioner failed to start: %w", err) } } @@ -244,7 +244,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini return fmt.Errorf("controller failed to start: %w", err) } if s.Provisioners > 0 { - if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, s.StartupTimeout, provisionerClient); err != nil { + if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, provisionerClient); err != nil { return fmt.Errorf("provisioner failed to start: %w", err) } } diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index 1e3f878098..383e15688b 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -124,7 +124,10 @@ func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language, } func (p *externalPluginImpl) ping(ctx context.Context) error { - err := rpc.Wait(ctx, backoff.Backoff{}, launchTimeout, p.client) + retry := backoff.Backoff{} + heartbeatCtx, cancel := context.WithTimeout(ctx, launchTimeout) + defer cancel() + err := rpc.Wait(heartbeatCtx, retry, p.client) if err != nil { return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err)) } diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index b6643b2152..fa5e06f0d5 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -140,66 +140,31 @@ func (m mergedContext) Value(key any) any { return m.values.Value(key) } -type noopLogSync struct{} - -var _ log.Sink = noopLogSync{} - -func (noopLogSync) Log(entry log.Entry) error { return nil } - // Wait for a client to become available. // -// This will repeatedly call Ping() according to the retry policy until the client is -// ready or the deadline is reached. +// This will repeatedly call Ping() every 100ms until the service becomes +// ready. TODO: This will probably need to be smarter at some point. // // If "ctx" is cancelled this will return ctx.Err() -// -// Usually rpc errors are logged, but this function will silence ping call errors, and -// returns the last error if the deadline is reached. -func Wait(ctx context.Context, retry backoff.Backoff, deadline time.Duration, client Pingable) error { - errChan := make(chan error) - ctx, cancel := context.WithTimeout(ctx, deadline) - defer cancel() - - go func() { - logger := log.FromContext(ctx) - // create a context logger with a new one that does not log debug messages (which include each ping call failures) - silencedCtx := log.ContextWithLogger(ctx, log.New(log.Error, noopLogSync{})) - - start := time.Now() - // keep track of the last ping error - var err error - for { - select { - case <-ctx.Done(): - if err != nil && errors.Is(ctx.Err(), context.DeadlineExceeded) { - errChan <- err - } else { - errChan <- ctx.Err() - } - return - default: - } - var resp *connect.Response[ftlv1.PingResponse] - resp, err = client.Ping(silencedCtx, connect.NewRequest(&ftlv1.PingRequest{})) - if err == nil { - if resp.Msg.NotReady == nil { - logger.Debugf("Ping succeeded in %.2fs", time.Since(start).Seconds()) - errChan <- nil - return - } - err = fmt.Errorf("service is not ready: %s", *resp.Msg.NotReady) +func Wait(ctx context.Context, retry backoff.Backoff, client Pingable) error { + logger := log.FromContext(ctx) + for { + select { + case <-ctx.Done(): + return ctx.Err() //nolint:wrapcheck + default: + } + resp, err := client.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{})) + if err == nil { + if resp.Msg.NotReady == nil { + return nil } - delay := retry.Duration() - logger.Tracef("Ping failed waiting %s for client: %+v", delay, err) - time.Sleep(delay) + err = fmt.Errorf("service is not ready: %s", *resp.Msg.NotReady) } - }() - - err := <-errChan - if err != nil { - return err + delay := retry.Duration() + logger.Tracef("Ping failed waiting %s for client: %+v", delay, err) + time.Sleep(delay) } - return nil } // RetryStreamingClientStream will repeatedly call handler with the stream