From e2df86bad07e17c276ec3fb921d79bd3ba8b8e65 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:09:30 +0200 Subject: [PATCH 1/5] feat: Use task broker's health endpoint Task broker now has a health endpoint, which we can now use. That way we don't need the n8n's api server's URI anymore, simplifying the config. --- internal/commands/launch.go | 10 ++--- internal/env/env.go | 20 ++-------- internal/env/env_test.go | 37 ++----------------- internal/errs/errs.go | 4 +- ...n_ready.go => check_until_broker_ready.go} | 16 ++++---- internal/http/fetch_grant_token.go | 2 +- internal/ws/handshake.go | 14 +++---- 7 files changed, 30 insertions(+), 73 deletions(-) rename internal/http/{check_until_n8n_ready.go => check_until_broker_ready.go} (62%) diff --git a/internal/commands/launch.go b/internal/commands/launch.go index 8734205..457b876 100644 --- a/internal/commands/launch.go +++ b/internal/commands/launch.go @@ -60,10 +60,10 @@ func (l *LaunchCommand) Execute() error { logs.Debugf("Filtered environment variables") for { - // 4. check until n8n instance is ready + // 4. check until task broker is ready - if err := http.CheckUntilN8nReady(envCfg.MainServerURI); err != nil { - return fmt.Errorf("encountered error while waiting for n8n to be ready: %w", err) + if err := http.CheckUntilBrokerReady(envCfg.TaskBrokerServerURI); err != nil { + return fmt.Errorf("encountered error while waiting for broker to be ready: %w", err) } // 5. fetch grant token for launcher @@ -86,9 +86,9 @@ func (l *LaunchCommand) Execute() error { err = ws.Handshake(handshakeCfg) switch { case errors.Is(err, errs.ErrServerDown): - logs.Warn("n8n is down, launcher will try to reconnect...") + logs.Warn("Task broker is down, launcher will try to reconnect...") time.Sleep(time.Second * 5) - continue // back to checking until n8n ready + continue // back to checking until broker ready case err != nil: return fmt.Errorf("handshake failed: %w", err) } diff --git a/internal/env/env.go b/internal/env/env.go index f9a62ac..6ae29d8 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -26,15 +26,12 @@ const ( EnvVarGrantToken = "N8N_RUNNERS_GRANT_TOKEN" // ------------------------ - // n8n main + // task broker // ------------------------ - // EnvVarMainServerURI is the env var for the URI of the n8n main instance's - // main server, typically at http://127.0.0.1:5678. - EnvVarMainServerURI = "N8N_MAIN_URI" - - // EnvVarTaskBrokerServerURI is the env var for the URI of the n8n main - // instance's task broker server, typically at http://127.0.0.1:5679. + // EnvVarTaskBrokerServerURI is the env var for the URI of the n8n + // task broker server, typically at http://127.0.0.1:5679. Typically + // the broker server runs inside an n8n instance (main or worker). EnvVarTaskBrokerServerURI = "N8N_TASK_BROKER_URI" // ------------------------ @@ -122,7 +119,6 @@ func validateURL(urlStr string, fieldName string) error { // nolint:revive // exported type EnvConfig struct { AuthToken string - MainServerURI string TaskBrokerServerURI string } @@ -132,7 +128,6 @@ func FromEnv() (*EnvConfig, error) { var errs []error authToken := os.Getenv(EnvVarAuthToken) - mainServerURI := os.Getenv(EnvVarMainServerURI) taskBrokerServerURI := os.Getenv(EnvVarTaskBrokerServerURI) idleTimeout := os.Getenv(EnvVarIdleTimeout) @@ -140,12 +135,6 @@ func FromEnv() (*EnvConfig, error) { errs = append(errs, fmt.Errorf("%s is required", EnvVarAuthToken)) } - if mainServerURI == "" { - mainServerURI = DefaultMainServerURI - } else if err := validateURL(mainServerURI, EnvVarMainServerURI); err != nil { - errs = append(errs, err) - } - if taskBrokerServerURI == "" { taskBrokerServerURI = DefaultTaskBrokerServerURI } else if err := validateURL(taskBrokerServerURI, EnvVarTaskBrokerServerURI); err != nil { @@ -167,7 +156,6 @@ func FromEnv() (*EnvConfig, error) { return &EnvConfig{ AuthToken: authToken, - MainServerURI: mainServerURI, TaskBrokerServerURI: taskBrokerServerURI, }, nil } diff --git a/internal/env/env_test.go b/internal/env/env_test.go index d3e7ac4..29e0359 100644 --- a/internal/env/env_test.go +++ b/internal/env/env_test.go @@ -154,50 +154,25 @@ func TestFromEnv(t *testing.T) { name: "valid custom configuration", envVars: map[string]string{ EnvVarAuthToken: "token123", - EnvVarMainServerURI: "http://localhost:9000", EnvVarTaskBrokerServerURI: "http://localhost:9001", EnvVarIdleTimeout: "30", }, expected: &EnvConfig{ AuthToken: "token123", - MainServerURI: "http://localhost:9000", TaskBrokerServerURI: "http://localhost:9001", }, }, { name: "missing auth token", envVars: map[string]string{ - EnvVarMainServerURI: "http://localhost:5678", EnvVarTaskBrokerServerURI: "http://localhost:5679", }, expectError: true, }, - { - name: "invalid main server URI", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarMainServerURI: "http://\\invalid:5678", - EnvVarTaskBrokerServerURI: "http://localhost:5679", - }, - expectError: true, - }, - { - name: "missing main server URI", - envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarTaskBrokerServerURI: "http://localhost:5679", - }, - expected: &EnvConfig{ - AuthToken: "token123", - MainServerURI: DefaultMainServerURI, - TaskBrokerServerURI: "http://localhost:5679", - }, - }, { name: "invalid task broker server URI", envVars: map[string]string{ EnvVarAuthToken: "token123", - EnvVarMainServerURI: "http://localhost:5678", EnvVarTaskBrokerServerURI: "http://\\invalid:5679", }, expectError: true, @@ -205,12 +180,10 @@ func TestFromEnv(t *testing.T) { { name: "missing task broker server URI", envVars: map[string]string{ - EnvVarAuthToken: "token123", - EnvVarMainServerURI: "http://localhost:5678", + EnvVarAuthToken: "token123", }, expected: &EnvConfig{ AuthToken: "token123", - MainServerURI: "http://localhost:5678", TaskBrokerServerURI: DefaultTaskBrokerServerURI, }, }, @@ -218,8 +191,7 @@ func TestFromEnv(t *testing.T) { name: "missing scheme in 127.0.0.1 URI", envVars: map[string]string{ EnvVarAuthToken: "token123", - EnvVarMainServerURI: "127.0.0.1:5678", - EnvVarTaskBrokerServerURI: "http://localhost:5679", + EnvVarTaskBrokerServerURI: "127.0.0.1:5679", }, expectError: true, }, @@ -227,8 +199,7 @@ func TestFromEnv(t *testing.T) { name: "missing scheme in localhost URI", envVars: map[string]string{ EnvVarAuthToken: "token123", - EnvVarMainServerURI: "localhost:5678", - EnvVarTaskBrokerServerURI: "http://localhost:5679", + EnvVarTaskBrokerServerURI: "localhost:5679", }, expectError: true, }, @@ -236,7 +207,6 @@ func TestFromEnv(t *testing.T) { name: "invalid idle timeout", envVars: map[string]string{ EnvVarAuthToken: "token123", - EnvVarMainServerURI: "http://localhost:5678", EnvVarTaskBrokerServerURI: "http://localhost:5679", EnvVarIdleTimeout: "invalid", }, @@ -246,7 +216,6 @@ func TestFromEnv(t *testing.T) { name: "negative idle timeout", envVars: map[string]string{ EnvVarAuthToken: "token123", - EnvVarMainServerURI: "http://localhost:5678", EnvVarTaskBrokerServerURI: "http://localhost:5679", EnvVarIdleTimeout: "-1", }, diff --git a/internal/errs/errs.go b/internal/errs/errs.go index 26128ed..8ce074f 100644 --- a/internal/errs/errs.go +++ b/internal/errs/errs.go @@ -3,8 +3,8 @@ package errs import "errors" var ( - // ErrServerDown is returned when the n8n runner server is down. - ErrServerDown = errors.New("n8n runner server is down") + // ErrServerDown is returned when the task broker server is down. + ErrServerDown = errors.New("task broker server is down") // ErrWsMsgTooLarge is returned when the websocket message is too large for // the launcher's websocket buffer. diff --git a/internal/http/check_until_n8n_ready.go b/internal/http/check_until_broker_ready.go similarity index 62% rename from internal/http/check_until_n8n_ready.go rename to internal/http/check_until_broker_ready.go index d079687..4f15184 100644 --- a/internal/http/check_until_n8n_ready.go +++ b/internal/http/check_until_broker_ready.go @@ -8,8 +8,8 @@ import ( "time" ) -func sendReadinessRequest(n8nMainServerURI string) (*http.Response, error) { - url := fmt.Sprintf("%s/healthz/readiness", n8nMainServerURI) +func sendHealthRequest(taskBrokerURI string) (*http.Response, error) { + url := fmt.Sprintf("%s/healthz", taskBrokerURI) client := &http.Client{ Timeout: 5 * time.Second, @@ -23,16 +23,16 @@ func sendReadinessRequest(n8nMainServerURI string) (*http.Response, error) { return client.Do(req) } -// CheckUntilN8nReady checks forever until the n8n main instance is ready, i.e. +// CheckUntilBrokerReady checks forever until the task broker is ready, i.e. // until its DB is connected and migrated. In case of long-running migrations, // readiness may take a long time. Returns nil when ready. -func CheckUntilN8nReady(n8nMainServerURI string) error { - logs.Info("Waiting for n8n to be ready...") +func CheckUntilBrokerReady(taskBrokerURI string) error { + logs.Info("Waiting for task broker to be ready...") readinessCheck := func() (string, error) { - resp, err := sendReadinessRequest(n8nMainServerURI) + resp, err := sendHealthRequest(taskBrokerURI) if err != nil { - return "", fmt.Errorf("n8n readiness check failed with error: %w", err) + return "", fmt.Errorf("task broker readiness check failed with error: %w", err) } defer resp.Body.Close() @@ -47,7 +47,7 @@ func CheckUntilN8nReady(n8nMainServerURI string) error { return err } - logs.Info("n8n instance is ready") + logs.Info("Task broker is ready") return nil } diff --git a/internal/http/fetch_grant_token.go b/internal/http/fetch_grant_token.go index a9f79dd..fa68d56 100644 --- a/internal/http/fetch_grant_token.go +++ b/internal/http/fetch_grant_token.go @@ -49,7 +49,7 @@ func sendGrantTokenRequest(taskBrokerServerURI, authToken string) (string, error } // FetchGrantToken exchanges the launcher's auth token for a single-use -// grant token from the n8n main instance. In case the n8n main instance is +// grant token from the task broker. In case the task broker is // temporarily unavailable, this exchange is retried a limited number of times. func FetchGrantToken(taskBrokerServerURI, authToken string) (string, error) { grantTokenFetch := func() (string, error) { diff --git a/internal/ws/handshake.go b/internal/ws/handshake.go index c0c4152..4486dcd 100644 --- a/internal/ws/handshake.go +++ b/internal/ws/handshake.go @@ -42,7 +42,7 @@ func validateConfig(cfg HandshakeConfig) error { } if cfg.TaskBrokerServerURI == "" { - return fmt.Errorf("n8n URI is missing") + return fmt.Errorf("task broker URI is missing") } if cfg.GrantToken == "" { @@ -55,11 +55,11 @@ func validateConfig(cfg HandshakeConfig) error { func buildWebsocketURL(taskBrokerServerURI, runnerID string) (*url.URL, error) { u, err := url.Parse(taskBrokerServerURI) if err != nil { - return nil, fmt.Errorf("invalid n8n URI: %w", err) + return nil, fmt.Errorf("invalid Task Broker URI: %w", err) } if u.RawQuery != "" { - return nil, fmt.Errorf("n8n URI must have no query params") + return nil, fmt.Errorf("task broker URI must have no query params") } u.Scheme = "ws" @@ -103,10 +103,10 @@ func isWsCloseError(err error) bool { return ok } -// Handshake is the flow where the launcher connects via websocket with main, -// registers with main's task broker, sends a non-expiring task offer to main, and -// receives the accept for that offer from main. Note that the handshake completes -// only once this task offer is accepted, which may take time. +// Handshake is the flow where the launcher connects via websocket with task broker, +// registers, sends a non-expiring task offer, and receives the accept for that +// offer. Note that the handshake completes only once this task offer is accepted, +// which may take time. func Handshake(cfg HandshakeConfig) error { if err := validateConfig(cfg); err != nil { return fmt.Errorf("received invalid handshake config: %w", err) From 8ae31d1ed9705378f054838da56846904e2c1c24 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 27 Nov 2024 14:19:20 +0200 Subject: [PATCH 2/5] Update internal/http/check_until_broker_ready.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- internal/http/check_until_broker_ready.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/http/check_until_broker_ready.go b/internal/http/check_until_broker_ready.go index 4f15184..f66761c 100644 --- a/internal/http/check_until_broker_ready.go +++ b/internal/http/check_until_broker_ready.go @@ -32,7 +32,7 @@ func CheckUntilBrokerReady(taskBrokerURI string) error { readinessCheck := func() (string, error) { resp, err := sendHealthRequest(taskBrokerURI) if err != nil { - return "", fmt.Errorf("task broker readiness check failed with error: %w", err) + return "", fmt.Errorf("task broker health check failed with error: %w", err) } defer resp.Body.Close() From c6d0e229d70e97837db3b419ee2ffe90760a2c83 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 27 Nov 2024 14:20:51 +0200 Subject: [PATCH 3/5] Update internal/env/env.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- internal/env/env.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/env/env.go b/internal/env/env.go index 6ae29d8..da7ae29 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -29,7 +29,7 @@ const ( // task broker // ------------------------ - // EnvVarTaskBrokerServerURI is the env var for the URI of the n8n + // EnvVarTaskBrokerServerURI is the env var for the URI of the // task broker server, typically at http://127.0.0.1:5679. Typically // the broker server runs inside an n8n instance (main or worker). EnvVarTaskBrokerServerURI = "N8N_TASK_BROKER_URI" From 431a911146cbf1ebfa91f90ac1bc7a3c382a83ce Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 27 Nov 2024 14:21:00 +0200 Subject: [PATCH 4/5] Update internal/ws/handshake.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- internal/ws/handshake.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/ws/handshake.go b/internal/ws/handshake.go index 4486dcd..cda8cc9 100644 --- a/internal/ws/handshake.go +++ b/internal/ws/handshake.go @@ -55,7 +55,7 @@ func validateConfig(cfg HandshakeConfig) error { func buildWebsocketURL(taskBrokerServerURI, runnerID string) (*url.URL, error) { u, err := url.Parse(taskBrokerServerURI) if err != nil { - return nil, fmt.Errorf("invalid Task Broker URI: %w", err) + return nil, fmt.Errorf("invalid task broker URI: %w", err) } if u.RawQuery != "" { From ccf241c24e935689d45b3ec0147a8f4737d0c8f0 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 27 Nov 2024 14:23:10 +0200 Subject: [PATCH 5/5] chore: Fix comment and error message wording --- internal/http/check_until_broker_ready.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/http/check_until_broker_ready.go b/internal/http/check_until_broker_ready.go index f66761c..d39549e 100644 --- a/internal/http/check_until_broker_ready.go +++ b/internal/http/check_until_broker_ready.go @@ -24,26 +24,26 @@ func sendHealthRequest(taskBrokerURI string) (*http.Response, error) { } // CheckUntilBrokerReady checks forever until the task broker is ready, i.e. -// until its DB is connected and migrated. In case of long-running migrations, -// readiness may take a long time. Returns nil when ready. +// In case of long-running migrations, readiness may take a long time. +// Returns nil when ready. func CheckUntilBrokerReady(taskBrokerURI string) error { logs.Info("Waiting for task broker to be ready...") - readinessCheck := func() (string, error) { + healthCheck := func() (string, error) { resp, err := sendHealthRequest(taskBrokerURI) if err != nil { - return "", fmt.Errorf("task broker health check failed with error: %w", err) + return "", fmt.Errorf("task broker readiness check failed with error: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return "", fmt.Errorf("readiness check failed with status code: %d", resp.StatusCode) + return "", fmt.Errorf("task broker readiness check failed with status code: %d", resp.StatusCode) } return "", nil } - if _, err := retry.UnlimitedRetry("readiness-check", readinessCheck); err != nil { + if _, err := retry.UnlimitedRetry("readiness-check", healthCheck); err != nil { return err }