Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use task broker's health endpoint #32

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/commands/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func (l *LaunchCommand) Execute() error {
logs.Debugf("Filtered environment variables")

for {
// 4. check until n8n instance is ready
// 4. check until task broker is ready

if err := http.CheckUntilN8nReady(envCfg.MainServerURI); err != nil {
return fmt.Errorf("encountered error while waiting for n8n to be ready: %w", err)
if err := http.CheckUntilBrokerReady(envCfg.TaskBrokerServerURI); err != nil {
return fmt.Errorf("encountered error while waiting for broker to be ready: %w", err)
}

// 5. fetch grant token for launcher
Expand All @@ -86,9 +86,9 @@ func (l *LaunchCommand) Execute() error {
err = ws.Handshake(handshakeCfg)
switch {
case errors.Is(err, errs.ErrServerDown):
logs.Warn("n8n is down, launcher will try to reconnect...")
logs.Warn("Task broker is down, launcher will try to reconnect...")
time.Sleep(time.Second * 5)
continue // back to checking until n8n ready
continue // back to checking until broker ready
case err != nil:
return fmt.Errorf("handshake failed: %w", err)
}
Expand Down
20 changes: 4 additions & 16 deletions internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@ const (
EnvVarGrantToken = "N8N_RUNNERS_GRANT_TOKEN"

// ------------------------
// n8n main
// task broker
// ------------------------

// EnvVarMainServerURI is the env var for the URI of the n8n main instance's
// main server, typically at http://127.0.0.1:5678.
EnvVarMainServerURI = "N8N_MAIN_URI"

// EnvVarTaskBrokerServerURI is the env var for the URI of the n8n main
// instance's task broker server, typically at http://127.0.0.1:5679.
// EnvVarTaskBrokerServerURI is the env var for the URI of the
// task broker server, typically at http://127.0.0.1:5679. Typically
// the broker server runs inside an n8n instance (main or worker).
EnvVarTaskBrokerServerURI = "N8N_TASK_BROKER_URI"

// ------------------------
Expand Down Expand Up @@ -122,7 +119,6 @@ func validateURL(urlStr string, fieldName string) error {
// nolint:revive // exported
type EnvConfig struct {
AuthToken string
MainServerURI string
TaskBrokerServerURI string
}

Expand All @@ -132,20 +128,13 @@ func FromEnv() (*EnvConfig, error) {
var errs []error

authToken := os.Getenv(EnvVarAuthToken)
mainServerURI := os.Getenv(EnvVarMainServerURI)
taskBrokerServerURI := os.Getenv(EnvVarTaskBrokerServerURI)
idleTimeout := os.Getenv(EnvVarIdleTimeout)

if authToken == "" {
errs = append(errs, fmt.Errorf("%s is required", EnvVarAuthToken))
}

if mainServerURI == "" {
mainServerURI = DefaultMainServerURI
} else if err := validateURL(mainServerURI, EnvVarMainServerURI); err != nil {
errs = append(errs, err)
}

if taskBrokerServerURI == "" {
taskBrokerServerURI = DefaultTaskBrokerServerURI
} else if err := validateURL(taskBrokerServerURI, EnvVarTaskBrokerServerURI); err != nil {
Expand All @@ -167,7 +156,6 @@ func FromEnv() (*EnvConfig, error) {

return &EnvConfig{
AuthToken: authToken,
MainServerURI: mainServerURI,
TaskBrokerServerURI: taskBrokerServerURI,
}, nil
}
37 changes: 3 additions & 34 deletions internal/env/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,89 +154,59 @@ func TestFromEnv(t *testing.T) {
name: "valid custom configuration",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "http://localhost:9000",
EnvVarTaskBrokerServerURI: "http://localhost:9001",
EnvVarIdleTimeout: "30",
},
expected: &EnvConfig{
AuthToken: "token123",
MainServerURI: "http://localhost:9000",
TaskBrokerServerURI: "http://localhost:9001",
},
},
{
name: "missing auth token",
envVars: map[string]string{
EnvVarMainServerURI: "http://localhost:5678",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
},
expectError: true,
},
{
name: "invalid main server URI",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "http://\\invalid:5678",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
},
expectError: true,
},
{
name: "missing main server URI",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
},
expected: &EnvConfig{
AuthToken: "token123",
MainServerURI: DefaultMainServerURI,
TaskBrokerServerURI: "http://localhost:5679",
},
},
{
name: "invalid task broker server URI",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "http://localhost:5678",
EnvVarTaskBrokerServerURI: "http://\\invalid:5679",
},
expectError: true,
},
{
name: "missing task broker server URI",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "http://localhost:5678",
EnvVarAuthToken: "token123",
},
expected: &EnvConfig{
AuthToken: "token123",
MainServerURI: "http://localhost:5678",
TaskBrokerServerURI: DefaultTaskBrokerServerURI,
},
},
{
name: "missing scheme in 127.0.0.1 URI",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "127.0.0.1:5678",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
EnvVarTaskBrokerServerURI: "127.0.0.1:5679",
},
expectError: true,
},
{
name: "missing scheme in localhost URI",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "localhost:5678",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
EnvVarTaskBrokerServerURI: "localhost:5679",
},
expectError: true,
},
{
name: "invalid idle timeout",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "http://localhost:5678",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
EnvVarIdleTimeout: "invalid",
},
Expand All @@ -246,7 +216,6 @@ func TestFromEnv(t *testing.T) {
name: "negative idle timeout",
envVars: map[string]string{
EnvVarAuthToken: "token123",
EnvVarMainServerURI: "http://localhost:5678",
EnvVarTaskBrokerServerURI: "http://localhost:5679",
EnvVarIdleTimeout: "-1",
},
Expand Down
4 changes: 2 additions & 2 deletions internal/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package errs
import "errors"

var (
// ErrServerDown is returned when the n8n runner server is down.
ErrServerDown = errors.New("n8n runner server is down")
// ErrServerDown is returned when the task broker server is down.
ErrServerDown = errors.New("task broker server is down")

// ErrWsMsgTooLarge is returned when the websocket message is too large for
// the launcher's websocket buffer.
Expand Down
53 changes: 53 additions & 0 deletions internal/http/check_until_broker_ready.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package http

import (
"fmt"
"net/http"
"task-runner-launcher/internal/logs"
"task-runner-launcher/internal/retry"
"time"
)

func sendHealthRequest(taskBrokerURI string) (*http.Response, error) {
url := fmt.Sprintf("%s/healthz", taskBrokerURI)

client := &http.Client{
Timeout: 5 * time.Second,
}

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

return client.Do(req)
}

// CheckUntilBrokerReady checks forever until the task broker is ready, i.e.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// CheckUntilBrokerReady checks forever until the task broker is ready, i.e.
// CheckUntilBrokerReady checks forever until the task broker server is ready, i.e.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to repeat the server everywhere. Task broker is a server

// In case of long-running migrations, readiness may take a long time.
// Returns nil when ready.
func CheckUntilBrokerReady(taskBrokerURI string) error {
logs.Info("Waiting for task broker to be ready...")

healthCheck := func() (string, error) {
resp, err := sendHealthRequest(taskBrokerURI)
if err != nil {
return "", fmt.Errorf("task broker readiness check failed with error: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("task broker readiness check failed with status code: %d", resp.StatusCode)
}

return "", nil
}

if _, err := retry.UnlimitedRetry("readiness-check", healthCheck); err != nil {
return err
}

logs.Info("Task broker is ready")

return nil
}
53 changes: 0 additions & 53 deletions internal/http/check_until_n8n_ready.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/http/fetch_grant_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func sendGrantTokenRequest(taskBrokerServerURI, authToken string) (string, error
}

// FetchGrantToken exchanges the launcher's auth token for a single-use
// grant token from the n8n main instance. In case the n8n main instance is
// grant token from the task broker. In case the task broker is
// temporarily unavailable, this exchange is retried a limited number of times.
func FetchGrantToken(taskBrokerServerURI, authToken string) (string, error) {
grantTokenFetch := func() (string, error) {
Expand Down
14 changes: 7 additions & 7 deletions internal/ws/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func validateConfig(cfg HandshakeConfig) error {
}

if cfg.TaskBrokerServerURI == "" {
return fmt.Errorf("n8n URI is missing")
return fmt.Errorf("task broker URI is missing")
}

if cfg.GrantToken == "" {
Expand All @@ -55,11 +55,11 @@ func validateConfig(cfg HandshakeConfig) error {
func buildWebsocketURL(taskBrokerServerURI, runnerID string) (*url.URL, error) {
u, err := url.Parse(taskBrokerServerURI)
if err != nil {
return nil, fmt.Errorf("invalid n8n URI: %w", err)
return nil, fmt.Errorf("invalid task broker URI: %w", err)
}

if u.RawQuery != "" {
return nil, fmt.Errorf("n8n URI must have no query params")
return nil, fmt.Errorf("task broker URI must have no query params")
}

u.Scheme = "ws"
Expand Down Expand Up @@ -103,10 +103,10 @@ func isWsCloseError(err error) bool {
return ok
}

// Handshake is the flow where the launcher connects via websocket with main,
// registers with main's task broker, sends a non-expiring task offer to main, and
// receives the accept for that offer from main. Note that the handshake completes
// only once this task offer is accepted, which may take time.
// Handshake is the flow where the launcher connects via websocket with task broker,
// registers, sends a non-expiring task offer, and receives the accept for that
// offer. Note that the handshake completes only once this task offer is accepted,
// which may take time.
func Handshake(cfg HandshakeConfig) error {
if err := validateConfig(cfg); err != nil {
return fmt.Errorf("received invalid handshake config: %w", err)
Expand Down