diff --git a/internal/eru/agent/agent.go b/internal/eru/agent/agent.go index 79c2984..ee0ac5e 100644 --- a/internal/eru/agent/agent.go +++ b/internal/eru/agent/agent.go @@ -8,6 +8,7 @@ import ( "time" "github.com/alphadose/haxmap" + "github.com/cenkalti/backoff/v4" "github.com/cockroachdb/errors" "github.com/patrickmn/go-cache" "github.com/projecteru2/core/log" @@ -41,8 +42,10 @@ type Manager struct { } func NewManager( - ctx context.Context, svc service.Service, - config *types.Config, endpoint string, + ctx context.Context, + svc service.Service, + config *types.Config, + eruEndpoint string, t *testing.T, ) (*Manager, error) { logger := log.WithFunc("agent.NewManager") @@ -76,25 +79,9 @@ func NewManager( labels[parts[0]] = parts[1] } go func() { - // Core need to connect to the local grpc server, so sleep 30s here to wait local grpc server up - time.Sleep(30 * time.Second) - // try to register current node to eru core - if _, err := m.store.AddNode(ctx, &types.AddNodeOpts{ - Nodename: config.Hostname, - Endpoint: endpoint, - Podname: config.Podname, - Labels: labels, - }); err != nil { - e, ok := status.FromError(err) - if !ok { - logger.Error(ctx, err, "failed to add node") - return - } - if e.Code() == corerpc.AddNode && strings.Contains(e.Message(), "node already exists") { - logger.Infof(ctx, "node %s already exists", config.Hostname) - } else { - logger.Errorf(ctx, err, "failed to add node %s", config.Hostname) - } + if err := m.addCurrentNodeWithRetry(ctx, config, eruEndpoint, labels); err != nil { + logger.Errorf(ctx, err, "failed to register current node") + return } // update node's labels if necessary if len(labels) > 0 { @@ -154,3 +141,38 @@ func (m *Manager) Exit() error { } return nil } + +func (m *Manager) addCurrentNodeWithRetry( + ctx context.Context, + config *types.Config, + eruEndpoint string, + labels map[string]string, +) error { + logger := log.WithFunc("addCurrentNodeWithRetry").WithField("hostname", config.Hostname) + interval := 10 * time.Second + maxRetries := 10 + time.Sleep(interval) + bf := backoff.NewConstantBackOff(interval) + return backoff.Retry(func() error { + // try to register current node to eru core + if _, err := m.store.AddNode(ctx, &types.AddNodeOpts{ + Nodename: config.Hostname, + Endpoint: eruEndpoint, + Podname: config.Podname, + Labels: labels, + }); err != nil { + e, ok := status.FromError(err) + if !ok { + logger.Error(ctx, err, "failed to add node") + return err + } + if e.Code() == corerpc.AddNode && strings.Contains(e.Message(), "node already exists") { + logger.Infof(ctx, "node %s already exists", config.Hostname) + return nil + } + logger.Errorf(ctx, err, "failed to add node %s", config.Hostname) + return err + } + return nil + }, backoff.WithMaxRetries(bf, uint64(maxRetries))) +}