From b0af210e28b8e36c3da7f8d05046b03f31d9daf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 14 Nov 2024 15:48:20 +0900 Subject: [PATCH 1/5] Fix ineffective assign --- core/services/feeds/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index f986d1753af..61b2d53f2d5 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -362,8 +362,8 @@ func (s *service) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsMa return nil, errors.Wrap(err, "failed to list managers by IDs") } - for _, manager := range managers { - manager.IsConnectionActive = s.connMgr.IsConnected(manager.ID) + for i, manager := range managers { + managers[i].IsConnectionActive = s.connMgr.IsConnected(manager.ID) } return managers, nil From 7eded5cf3ef87f05dcd4696c7bb2e06495d09872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 14 Nov 2024 15:48:39 +0900 Subject: [PATCH 2/5] jd: If a node is already registered, just proceed --- deployment/environment/devenv/don.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deployment/environment/devenv/don.go b/deployment/environment/devenv/don.go index 830f5b921bc..92751ca94f6 100644 --- a/deployment/environment/devenv/don.go +++ b/deployment/environment/devenv/don.go @@ -367,7 +367,8 @@ func (n *Node) CreateJobDistributor(ctx context.Context, jd JobDistributor) (str func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor) error { // register the node in the job distributor err := n.RegisterNodeToJobDistributor(ctx, jd) - if err != nil { + // TODO: check for rpc code = "AlreadyExists" instead + if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { return err } // now create the job distributor in the node From 8d92b959fc72ff660e8a129a25ae4e9e4d8948d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 14 Nov 2024 15:50:52 +0900 Subject: [PATCH 3/5] jd: only set transport credentials if actually set --- deployment/environment/devenv/jd.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/deployment/environment/devenv/jd.go b/deployment/environment/devenv/jd.go index 9af8412d61e..48150340cae 100644 --- a/deployment/environment/devenv/jd.go +++ b/deployment/environment/devenv/jd.go @@ -45,8 +45,9 @@ func authTokenInterceptor(source oauth2.TokenSource) grpc.UnaryClientInterceptor } func NewJDConnection(cfg JDConfig) (*grpc.ClientConn, error) { - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(cfg.Creds), + opts := []grpc.DialOption{} + if cfg.Creds != nil { + opts = append(opts, grpc.WithTransportCredentials(cfg.Creds)) } if cfg.Auth != nil { opts = append(opts, grpc.WithUnaryInterceptor(authTokenInterceptor(cfg.Auth))) From cdbd5e455fa826b9f367110eaab79f5ee6858f94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 14 Nov 2024 15:51:48 +0900 Subject: [PATCH 4/5] web/client: Clearer error if feeds manager fails to create --- deployment/environment/web/sdk/client/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deployment/environment/web/sdk/client/client.go b/deployment/environment/web/sdk/client/client.go index 011eb0cce31..74d18c13acb 100644 --- a/deployment/environment/web/sdk/client/client.go +++ b/deployment/environment/web/sdk/client/client.go @@ -202,6 +202,10 @@ func (c *client) CreateJobDistributor(ctx context.Context, in JobDistributorInpu feedsManager := success.GetFeedsManager() return feedsManager.GetId(), nil } + if err, ok := response.GetCreateFeedsManager().(*generated.CreateFeedsManagerCreateFeedsManagerSingleFeedsManagerError); ok { + msg := err.GetMessage() + return "", fmt.Errorf("failed to create feeds manager: %v", msg) + } return "", fmt.Errorf("failed to create feeds manager") } From 2bca70c0e015f0ea7648709bca9df98fa3e2cc1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 14 Nov 2024 23:39:49 +0900 Subject: [PATCH 5/5] Add some idempotency and robustness to the node registration --- deployment/environment/devenv/don.go | 33 +++++++++++++++---- .../environment/web/sdk/client/client.go | 2 +- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/deployment/environment/devenv/don.go b/deployment/environment/devenv/don.go index 92751ca94f6..05a3d5bea08 100644 --- a/deployment/environment/devenv/don.go +++ b/deployment/environment/devenv/don.go @@ -335,8 +335,30 @@ func (n *Node) RegisterNodeToJobDistributor(ctx context.Context, jd JobDistribut Labels: n.labels, Name: n.Name, }) - - if err != nil { + // node already registered, fetch it's id + // TODO: check for rpc code = "AlreadyExists" instead + if err != nil && strings.Contains(err.Error(), "AlreadyExists") { + nodesResponse, err := jd.ListNodes(ctx, &nodev1.ListNodesRequest{ + Filter: &nodev1.ListNodesRequest_Filter{ + Selectors: []*ptypes.Selector{ + { + Key: "p2p_id", + Op: ptypes.SelectorOp_EQ, + Value: peerID, + }, + }, + }, + }) + if err != nil { + return err + } + nodes := nodesResponse.GetNodes() + if len(nodes) == 0 { + return fmt.Errorf("failed to find node: %v", n.Name) + } + n.NodeId = nodes[0].Id + return nil + } else if err != nil { return fmt.Errorf("failed to register node %s: %w", n.Name, err) } if registerResponse.GetNode().GetId() == "" { @@ -367,13 +389,12 @@ func (n *Node) CreateJobDistributor(ctx context.Context, jd JobDistributor) (str func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor) error { // register the node in the job distributor err := n.RegisterNodeToJobDistributor(ctx, jd) - // TODO: check for rpc code = "AlreadyExists" instead - if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { + if err != nil { return err } // now create the job distributor in the node id, err := n.CreateJobDistributor(ctx, jd) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "DuplicateFeedsManagerError") { return err } // wait for the node to connect to the job distributor @@ -382,7 +403,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor Id: n.NodeId, }) if err != nil { - return fmt.Errorf("failed to get node %s: %w", n.Name, err) + return retry.RetryableError(fmt.Errorf("failed to get node %s: %w", n.Name, err)) } if getRes.GetNode() == nil { return fmt.Errorf("no node found for node id %s", n.NodeId) diff --git a/deployment/environment/web/sdk/client/client.go b/deployment/environment/web/sdk/client/client.go index 74d18c13acb..5472591ef94 100644 --- a/deployment/environment/web/sdk/client/client.go +++ b/deployment/environment/web/sdk/client/client.go @@ -206,7 +206,7 @@ func (c *client) CreateJobDistributor(ctx context.Context, in JobDistributorInpu msg := err.GetMessage() return "", fmt.Errorf("failed to create feeds manager: %v", msg) } - return "", fmt.Errorf("failed to create feeds manager") + return "", fmt.Errorf("failed to create feeds manager: %v", response.GetCreateFeedsManager().GetTypename()) } func (c *client) UpdateJobDistributor(ctx context.Context, id string, in JobDistributorInput) error {