Skip to content

Commit

Permalink
project-flotta#118 ECOPROJECT-717 Implement registration-retry with …
Browse files Browse the repository at this point in the history
…exponential backoff
  • Loading branch information
gabriel-farache committed Apr 13, 2022
1 parent 1d2e58c commit 49602e3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 16 deletions.
67 changes: 56 additions & 11 deletions internal/registration/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

const (
retryAfter = 10
retryAfter = 10
maxInterval = 60
)

//go:generate mockgen -package=registration -destination=mock_deregistrable.go . Deregistrable
Expand All @@ -43,6 +44,7 @@ type Registration struct {
lock sync.RWMutex
deregistrables []Deregistrable
clientCert *ClientCert
nbRetry int
}

func NewRegistration(deviceID string, hardware *hardware2.Hardware, dispatcherClient DispatcherClient,
Expand All @@ -55,6 +57,7 @@ func NewRegistration(deviceID string, hardware *hardware2.Hardware, dispatcherCl
RetryAfter: retryAfter,
workloads: workloadsManager,
lock: sync.RWMutex{},
nbRetry: 0,
}
err := reg.CreateClientCerts()
if err != nil {
Expand Down Expand Up @@ -111,18 +114,54 @@ func (r *Registration) RegisterDevice() {
}

func (r *Registration) registerDeviceWithRetries(interval int64) {
ticker := time.NewTicker(time.Second * time.Duration(interval))
for range ticker.C {
if !r.config.IsInitialConfig() {
ticker.Stop()
break
currentInterval := interval
ticker := time.NewTicker(time.Second * time.Duration(currentInterval))
var registrationSuccess bool

for {
select {
case <-ticker.C:
registrationSuccess, ticker = tryRegister(currentInterval, r, ticker)
if registrationSuccess {
break
}

}
log.Infof("configuration has not been initialized yet. Sending registration request. DeviceID: %s;", r.deviceID)
err := r.registerDeviceOnce()
if err != nil {
log.Errorf("cannot register device. DeviceID: %s; err: %v", r.deviceID, err)

}

}

func tryRegister(currentInterval int64, r *Registration, ticker *time.Ticker) (bool, *time.Ticker) {
log.Debugf("Current interval: %d", currentInterval)
if !r.config.IsInitialConfig() {
ticker.Stop()
return true, ticker
}
log.Infof("configuration has not been initialized yet. Sending registration request. DeviceID: %s;", r.deviceID)
err := r.registerDeviceOnce()
if err != nil {
log.Errorf("cannot register device. DeviceID: %s; err: %v", r.deviceID, err)
}
ticker = incrementIntervalAndApply(&currentInterval, ticker)
r.nbRetry++

return false, ticker
}

func incrementIntervalAndApply(currentInterval *int64, ticker *time.Ticker) *time.Ticker {
interval := *currentInterval

if interval < maxInterval {
interval = interval * 2
if interval > maxInterval {
interval = maxInterval
}
ticker.Stop()
ticker = time.NewTicker(time.Duration(interval) * time.Second)
*currentInterval = interval
}
return ticker
}

func (r *Registration) registerDeviceOnce() error {
Expand Down Expand Up @@ -172,7 +211,7 @@ func (r *Registration) registerDeviceOnce() error {
var message models.MessageResponse
err = json.Unmarshal(parsedResponse.Body, &message)
if err != nil {
return fmt.Errorf("Cannot unmarshal registration response content: %v", err)
return fmt.Errorf("Cannot unmarshal registration response content: %v", err)
}

parsedContent, ok := message.Content.(map[string]interface{})
Expand Down Expand Up @@ -222,6 +261,12 @@ func (r *Registration) Deregister() error {
return errors
}

func (r *Registration) NbRetry() int {
r.lock.RLock()
defer r.lock.RUnlock()
return r.nbRetry
}

type YGGDResponse struct {
// StatusCode response
StatusCode int
Expand Down
11 changes: 6 additions & 5 deletions internal/registration/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ var _ = Describe("Registration", func() {
Expect(content).To(Equal(clientCertPem))
})

It("Try to re-register", func() {
FIt("Try to re-register", func() {
// given
reg, err := registration.NewRegistration(deviceID, hw, dispatcherMock, configManager, wkManager)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -288,21 +288,22 @@ var _ = Describe("Registration", func() {
Certificate: string(clientCertPem),
})

reg.RetryAfter = 1

// then
dispatcherMock.EXPECT().Send(gomock.Any(), gomock.Any()).Return(
nil, fmt.Errorf("failed")).Times(1)
nil, fmt.Errorf("failed")).Times(4)
dispatcherMock.EXPECT().
Send(gomock.Any(), RegistrationMatcher()).
Return(&pb.Response{Response: msgResponse}, nil).
Times(1)

reg.RetryAfter = 1 //will do try then wait for 1 sec, 2 sec, 4 sec => 7sec in total for 4 attempts

// when
reg.RegisterDevice()

// then
Eventually(reg.IsRegistered, "5s").Should(BeTrue())
Eventually(reg.IsRegistered, "8s").Should(BeTrue())
Eventually(reg.NbRetry, "8s").Should(Equal(4))
})

})
Expand Down

0 comments on commit 49602e3

Please sign in to comment.